admin管理员组

文章数量:1030242

【RabbitMQ】AMQP协议、生产者和消费者的代码编写

AMQP

image.png

AMQPAdvanced Message Queuing Protocol)是一种高级消息队列协议

  • 定义了一套确定的消息交换功能,包括交换机(Exchange),队列(Queue)等
  • 这些组件共同工作,使得生产者能够将消息发送到交换器,然后由队列接收并等待消费者接收
  • AMQP 还定义了一个网络协议,允许客户端应用通过该协议与消息代理和 AMQP 模型进行交互通信

RabbitMQ 是遵从 AMQP 协议的,换句话说,RabbitMQ 就是 AMQP 协议的 Erlang 的视线(当然 RabbitMQ 还支持 STOMP2MQTT2 等协议)

  • AMQP 的模型结构和 RabbitMQ 的模型结构是一样的

1 . 引入依赖

在程序中加入相关依赖

代码语言:javascript代码运行次数:0运行复制
<!-- .rabbitmq/amqp-client -->  
<dependency>  
    <groupId>com.rabbitmq</groupId>  
    <artifactId>amqp-client</artifactId>  
    <version>5.25.0</version>  
</dependency>

2 . 生产者代码编写

image.png

生产消息,并把消息投放到队列中

1. 建立连接

建立连接,需要建立一个连接工厂,然后把信息都放进去,需要连接的时候,直接就从工厂里面拿就可以了

建立连接需要的信息

  • IP
  • 端口号
  • 账号
  • 密码
代码语言:javascript代码运行次数:0运行复制
// 1. 建立连接,创建连接工厂  
ConnectionFactory connectionFactory = new ConnectionFactory();  
connectionFactory.setHost("localhost");  
connectionFactory.setPort(5672);  // 需要提前开放端口号  
connectionFactory.setUsername("study"); // 账号  
connectionFactory.setPassword("study"); // 密码  
connectionFactory.setVirtualHost("coding"); // 虚拟主机  
  
// 工厂建立好了之后,从里面拿出一个连接  
Connection connection = connectionFactory.newConnection();

2. 开启信道

在建立好连接之后,就需要在连接里面创建一个信道,供消息传输

代码语言:javascript代码运行次数:0运行复制
// 2. 开启信道  
Channel channel = connection.createChannel();

3. 声明交换机

可以直接使用内置的交换机,就不需要进行操作

4. 声明队列

声明队列需要用到一个 queueDeclare 方法

代码语言:javascript代码运行次数:0运行复制
// 4. 声明队列  
/**  
 * 声明队列使用的方法的参数  
 * queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)  
 * 参数说明:  
 * var1(queue): 队列名称  
 * var2(durable): 是否持久化
 * var3(exclusive): 是否独占  
 * var4(autoDelete): 是否自动删除  
 * var5(arguments): 参数  
 */  
channel.queueDeclare("hello", true, false, false, null);
  • 如果没有一个叫 hello 的队列,就会创建;有则不创建
  • durable:是否持久化
    • true:设置队列为持久化,持久化的队列会存盘,服务器重启之后,消息不会丢失
  • exclusive:是否独占,只能有一个消费者监听队列
    • Connection 关闭时,是否删除队列
  • autoDelete:是否自动删除,当没有 Consumer 的时候,自动删除掉

5. 发送消息

发送消息的时候,会用到 basicPublish() 方法

代码语言:javascript代码运行次数:0运行复制
// 5. 发送消息  
/**  
 * basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4) * 参数说明:  
 * var1(exchange): 交换机名称  
 * var2(routingKey): 内置交换机,routingKey和队列名称保持一致(发送到名字一样的队列去)  
 * var3(props): 配置  
 * var4(body): 消息  
 */  
for (int i = 0; i < 10; i++) {  
    String msg = "hello rabbitmq" + i;  
    channel.basicPublish("", "hello", null, msg.getBytes());  
}  
System.out.println("消息发送成功");
  • 此处使用的内置交换机,使用内置交换机时,routingKey 要和队列名称一样,才可以路由到对应的队列上去

6. 释放资源

显示地关闭 Channel 是一个好习惯,但这不是必须的,Connection 关闭的时候,Channel 也会自动关闭

代码语言:javascript代码运行次数:0运行复制
// 6. 资源释放  
channel.close();  
connection.close();
  • 两个释放的顺序不能改变
    • 先释放信道,再释放连接

此时我们运行程序,就能看到 RabbitMQ 界面客户端里面:

image.png
  • 一条信息已经准备好了

3 . 消费者代码编写

  • 从队列中获取消息

1. 建立连接

消费者接收消息,我们也要先建立连接。此处建立连接的思路和生成者的一样

  • 先根据相关信息创建连接工厂
  • 然后从连接工厂中拿连接
代码语言:javascript代码运行次数:0运行复制
// 1. 创建连接  
ConnectionFactory connectionFactory = new ConnectionFactory();  
connectionFactory.setHost("localhost");  
connectionFactory.setPort(5672);  
connectionFactory.setVirtualHost("coding");  
connectionFactory.setUsername("study");  
connectionFactory.setPassword("study");  

// 取出连接
Connection connection = connectionFactory.newConnection();

2. 创建信道

创建信道,供 Brocker 和消费者之间进行通信

代码语言:javascript代码运行次数:0运行复制
// 2. 创建 Channel
Channel channel = connection.createChannel();

3. 声明队列

这里用到的方法,和创建生产者中用到的方法一样—— queueDeclare()

代码语言:javascript代码运行次数:0运行复制
// 3. 声明队列  
channel.queueDeclare("hello", true, false, false, null);
  • hello:队列的名字
  • true:持久化,队列会存盘,服务器重启之后,消息不会丢失
  • false:不是独占,不是只有一个消费者监听队列
  • false:不是自动删除,当没有 Consumer 的时候,也不会自动删除掉
  • null:没有其他参数

4. 消费消息

这里会用到 basicConsume() 方法

代码语言:javascript代码运行次数:0运行复制
// 4. 消费消息  
/**  
 * basicConsume(String var1, boolean var2, Consumer var3) * 参数说明:  
 * var1(queue): 队列名称  
 * var2(autoAck): 是否自动确认  
 * var3(callback): 接收到消息后,执行的逻辑  
 */
 channel.basicConsume("hello", true, consumer);
Consumer

Consumer 用于定义消息消费者的行为。当我们需要从 RabbitMQ 接收消息的时候,需要提供一个实现了 Consumer 接口的对象

代码语言:javascript代码运行次数:0运行复制
DefaultConsumer consumer = new DefaultConsumer(channel){ 
    // 从队列中收到消息,就会执行的方法  
    @Override  
    /**  
     * handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)     * 参数说明:  
     * consumerTag: 消费者标签,通常是消费者在订阅队列时指定的  
     * envelope: 包含消息的封包信息,如队列名称,交换机等  
     * properties: 一些配置信息  
     * body: 消息的具体内容  
     */  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
        //TODO  
        System.out.println("接收到消息: " + new String(body));  
    }  
};
  • DefaultConsumerRabbitMQ 提供的一个默认消费者,实现了 Consumer 接口
  • 从队列接收到消息的时候,就会自动调用核心方法—— handleDelivery()
    • 在这个方法中,我们可以定义如何处理接收到的消息,例如打印消息内容,处理业务逻辑或者将消息存储到数据库等

5. 释放资源

代码语言:javascript代码运行次数:0运行复制
// 5. 释放资源  
channel.close();  
connection.close();

在释放资源之前可以让程序休眠两秒钟,等待回调函数执行完之后再释放,就可以看到打印出来的完整信息了

image.png

源代码

生产者

代码语言:javascript代码运行次数:0运行复制
package rabbitmq;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
  
public class ProducerDemo {  
    public static void main(String[] args) throws IOException, TimeoutException {  
  
        // 1. 建立连接,创建连接工厂  
        ConnectionFactory connectionFactory = new ConnectionFactory();  
        connectionFactory.setHost("localhost");  
        connectionFactory.setPort(5672);  // 需要提前开放端口号  
        connectionFactory.setUsername("study"); // 账号  
        connectionFactory.setPassword("study"); // 密码  
        connectionFactory.setVirtualHost("coding"); // 虚拟主机  
  
        // 工厂建立好了之后,从里面拿出一个连接  
        Connection connection = connectionFactory.newConnection();  
  
        // 2. 开启信道  
        Channel channel = connection.createChannel();  
  
        // 3. 声明交换机  使用内置的交换机  
  
        // 4. 声明队列  
        /**  
         * 声明队列使用的方法的参数  
         * queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)  
         * 参数说明:  
         * var1(queue): 队列名称  
         * var2(durable): 可持久化  
         * var3(exclusive): 是否独占  
         * var4(autoDelete): 是否自动删除  
         * var5(arguments): 参数  
         */  
        channel.queueDeclare("hello", true, false, false, null);  
  
        // 交换机要将信息传给对应的队列,所以他们之间存在绑定关系  
        // 这个绑定关系我们先省略掉,因为我们使用的是内置交换机,  
  
        // 5. 发送消息  
        /**  
         * basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4)         * 参数说明:  
         * var1(exchange): 交换机名称  
         * var2(routingKey): 内置交换机,routingKey和队列名称保持一致(发送到名字一样的队列去)  
         * var3(props): 配置  
         * var4(body): 消息  
         */  
        for (int i = 0; i < 10; i++) {  
            String msg = "hello rabbitmq" + i;  
            channel.basicPublish("", "hello", null, msg.getBytes());  
        }  
        System.out.println("消息发送成功");  
  
        // 6. 资源释放  
        //channel.close();  
        //connection.close();    
    }  
}

消费者

代码语言:javascript代码运行次数:0运行复制
package rabbitmq;  
  
import com.rabbitmq.client.*;  
  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
public class ConsumerDemo {  
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  
  
        // 1. 创建连接  
        ConnectionFactory connectionFactory = new ConnectionFactory();  
        connectionFactory.setHost("localhost");  
        connectionFactory.setPort(5672);  
        connectionFactory.setVirtualHost("coding");  
        connectionFactory.setUsername("study");  
        connectionFactory.setPassword("study");  
        Connection connection = connectionFactory.newConnection();  
  
        // 2. 创建 Channel        Channel channel = connection.createChannel();  
  
        // 3. 声明队列  
        channel.queueDeclare("hello", true, false, false, null);  
  
        // 4. 消费消息  
        /**  
         * basicConsume(String var1, boolean var2, Consumer var3)         * 参数说明:  
         * var1(queue): 队列名称  
         * var2(autoAck): 是否自动确认  
         * var3(callback): 接收到消息后,执行的逻辑  
         */  
        DefaultConsumer consumer = new DefaultConsumer(channel){  
            // 从队列中收到消息,就会执行的方法  
            @Override  
            /**  
             * handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)             * 参数说明:  
             * consumerTag: 消费者标签,通常是消费者在订阅队列时指定的  
             * envelope: 包含消息的封包信息,如队列名称,交换机等  
             * properties: 一些配置信息  
             * body: 消息的具体内容  
             */  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
                //TODO  
                System.out.println("接收到消息: " + new String(body));  
            }  
        };  
        channel.basicConsume("hello", true, consumer);  
  
        // 等待程序执行完成  
        Thread.sleep(2000);  
  
        // 5. 释放资源  
        channel.close();  
        connection.close();  
    }  
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2025-04-16,如有侵权请联系 cloudcommunity@tencent 删除连接协议rabbitmqamqp队列

【RabbitMQ】AMQP协议、生产者和消费者的代码编写

AMQP

image.png

AMQPAdvanced Message Queuing Protocol)是一种高级消息队列协议

  • 定义了一套确定的消息交换功能,包括交换机(Exchange),队列(Queue)等
  • 这些组件共同工作,使得生产者能够将消息发送到交换器,然后由队列接收并等待消费者接收
  • AMQP 还定义了一个网络协议,允许客户端应用通过该协议与消息代理和 AMQP 模型进行交互通信

RabbitMQ 是遵从 AMQP 协议的,换句话说,RabbitMQ 就是 AMQP 协议的 Erlang 的视线(当然 RabbitMQ 还支持 STOMP2MQTT2 等协议)

  • AMQP 的模型结构和 RabbitMQ 的模型结构是一样的

1 . 引入依赖

在程序中加入相关依赖

代码语言:javascript代码运行次数:0运行复制
<!-- .rabbitmq/amqp-client -->  
<dependency>  
    <groupId>com.rabbitmq</groupId>  
    <artifactId>amqp-client</artifactId>  
    <version>5.25.0</version>  
</dependency>

2 . 生产者代码编写

image.png

生产消息,并把消息投放到队列中

1. 建立连接

建立连接,需要建立一个连接工厂,然后把信息都放进去,需要连接的时候,直接就从工厂里面拿就可以了

建立连接需要的信息

  • IP
  • 端口号
  • 账号
  • 密码
代码语言:javascript代码运行次数:0运行复制
// 1. 建立连接,创建连接工厂  
ConnectionFactory connectionFactory = new ConnectionFactory();  
connectionFactory.setHost("localhost");  
connectionFactory.setPort(5672);  // 需要提前开放端口号  
connectionFactory.setUsername("study"); // 账号  
connectionFactory.setPassword("study"); // 密码  
connectionFactory.setVirtualHost("coding"); // 虚拟主机  
  
// 工厂建立好了之后,从里面拿出一个连接  
Connection connection = connectionFactory.newConnection();

2. 开启信道

在建立好连接之后,就需要在连接里面创建一个信道,供消息传输

代码语言:javascript代码运行次数:0运行复制
// 2. 开启信道  
Channel channel = connection.createChannel();

3. 声明交换机

可以直接使用内置的交换机,就不需要进行操作

4. 声明队列

声明队列需要用到一个 queueDeclare 方法

代码语言:javascript代码运行次数:0运行复制
// 4. 声明队列  
/**  
 * 声明队列使用的方法的参数  
 * queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)  
 * 参数说明:  
 * var1(queue): 队列名称  
 * var2(durable): 是否持久化
 * var3(exclusive): 是否独占  
 * var4(autoDelete): 是否自动删除  
 * var5(arguments): 参数  
 */  
channel.queueDeclare("hello", true, false, false, null);
  • 如果没有一个叫 hello 的队列,就会创建;有则不创建
  • durable:是否持久化
    • true:设置队列为持久化,持久化的队列会存盘,服务器重启之后,消息不会丢失
  • exclusive:是否独占,只能有一个消费者监听队列
    • Connection 关闭时,是否删除队列
  • autoDelete:是否自动删除,当没有 Consumer 的时候,自动删除掉

5. 发送消息

发送消息的时候,会用到 basicPublish() 方法

代码语言:javascript代码运行次数:0运行复制
// 5. 发送消息  
/**  
 * basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4) * 参数说明:  
 * var1(exchange): 交换机名称  
 * var2(routingKey): 内置交换机,routingKey和队列名称保持一致(发送到名字一样的队列去)  
 * var3(props): 配置  
 * var4(body): 消息  
 */  
for (int i = 0; i < 10; i++) {  
    String msg = "hello rabbitmq" + i;  
    channel.basicPublish("", "hello", null, msg.getBytes());  
}  
System.out.println("消息发送成功");
  • 此处使用的内置交换机,使用内置交换机时,routingKey 要和队列名称一样,才可以路由到对应的队列上去

6. 释放资源

显示地关闭 Channel 是一个好习惯,但这不是必须的,Connection 关闭的时候,Channel 也会自动关闭

代码语言:javascript代码运行次数:0运行复制
// 6. 资源释放  
channel.close();  
connection.close();
  • 两个释放的顺序不能改变
    • 先释放信道,再释放连接

此时我们运行程序,就能看到 RabbitMQ 界面客户端里面:

image.png
  • 一条信息已经准备好了

3 . 消费者代码编写

  • 从队列中获取消息

1. 建立连接

消费者接收消息,我们也要先建立连接。此处建立连接的思路和生成者的一样

  • 先根据相关信息创建连接工厂
  • 然后从连接工厂中拿连接
代码语言:javascript代码运行次数:0运行复制
// 1. 创建连接  
ConnectionFactory connectionFactory = new ConnectionFactory();  
connectionFactory.setHost("localhost");  
connectionFactory.setPort(5672);  
connectionFactory.setVirtualHost("coding");  
connectionFactory.setUsername("study");  
connectionFactory.setPassword("study");  

// 取出连接
Connection connection = connectionFactory.newConnection();

2. 创建信道

创建信道,供 Brocker 和消费者之间进行通信

代码语言:javascript代码运行次数:0运行复制
// 2. 创建 Channel
Channel channel = connection.createChannel();

3. 声明队列

这里用到的方法,和创建生产者中用到的方法一样—— queueDeclare()

代码语言:javascript代码运行次数:0运行复制
// 3. 声明队列  
channel.queueDeclare("hello", true, false, false, null);
  • hello:队列的名字
  • true:持久化,队列会存盘,服务器重启之后,消息不会丢失
  • false:不是独占,不是只有一个消费者监听队列
  • false:不是自动删除,当没有 Consumer 的时候,也不会自动删除掉
  • null:没有其他参数

4. 消费消息

这里会用到 basicConsume() 方法

代码语言:javascript代码运行次数:0运行复制
// 4. 消费消息  
/**  
 * basicConsume(String var1, boolean var2, Consumer var3) * 参数说明:  
 * var1(queue): 队列名称  
 * var2(autoAck): 是否自动确认  
 * var3(callback): 接收到消息后,执行的逻辑  
 */
 channel.basicConsume("hello", true, consumer);
Consumer

Consumer 用于定义消息消费者的行为。当我们需要从 RabbitMQ 接收消息的时候,需要提供一个实现了 Consumer 接口的对象

代码语言:javascript代码运行次数:0运行复制
DefaultConsumer consumer = new DefaultConsumer(channel){ 
    // 从队列中收到消息,就会执行的方法  
    @Override  
    /**  
     * handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)     * 参数说明:  
     * consumerTag: 消费者标签,通常是消费者在订阅队列时指定的  
     * envelope: 包含消息的封包信息,如队列名称,交换机等  
     * properties: 一些配置信息  
     * body: 消息的具体内容  
     */  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
        //TODO  
        System.out.println("接收到消息: " + new String(body));  
    }  
};
  • DefaultConsumerRabbitMQ 提供的一个默认消费者,实现了 Consumer 接口
  • 从队列接收到消息的时候,就会自动调用核心方法—— handleDelivery()
    • 在这个方法中,我们可以定义如何处理接收到的消息,例如打印消息内容,处理业务逻辑或者将消息存储到数据库等

5. 释放资源

代码语言:javascript代码运行次数:0运行复制
// 5. 释放资源  
channel.close();  
connection.close();

在释放资源之前可以让程序休眠两秒钟,等待回调函数执行完之后再释放,就可以看到打印出来的完整信息了

image.png

源代码

生产者

代码语言:javascript代码运行次数:0运行复制
package rabbitmq;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
  
public class ProducerDemo {  
    public static void main(String[] args) throws IOException, TimeoutException {  
  
        // 1. 建立连接,创建连接工厂  
        ConnectionFactory connectionFactory = new ConnectionFactory();  
        connectionFactory.setHost("localhost");  
        connectionFactory.setPort(5672);  // 需要提前开放端口号  
        connectionFactory.setUsername("study"); // 账号  
        connectionFactory.setPassword("study"); // 密码  
        connectionFactory.setVirtualHost("coding"); // 虚拟主机  
  
        // 工厂建立好了之后,从里面拿出一个连接  
        Connection connection = connectionFactory.newConnection();  
  
        // 2. 开启信道  
        Channel channel = connection.createChannel();  
  
        // 3. 声明交换机  使用内置的交换机  
  
        // 4. 声明队列  
        /**  
         * 声明队列使用的方法的参数  
         * queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)  
         * 参数说明:  
         * var1(queue): 队列名称  
         * var2(durable): 可持久化  
         * var3(exclusive): 是否独占  
         * var4(autoDelete): 是否自动删除  
         * var5(arguments): 参数  
         */  
        channel.queueDeclare("hello", true, false, false, null);  
  
        // 交换机要将信息传给对应的队列,所以他们之间存在绑定关系  
        // 这个绑定关系我们先省略掉,因为我们使用的是内置交换机,  
  
        // 5. 发送消息  
        /**  
         * basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4)         * 参数说明:  
         * var1(exchange): 交换机名称  
         * var2(routingKey): 内置交换机,routingKey和队列名称保持一致(发送到名字一样的队列去)  
         * var3(props): 配置  
         * var4(body): 消息  
         */  
        for (int i = 0; i < 10; i++) {  
            String msg = "hello rabbitmq" + i;  
            channel.basicPublish("", "hello", null, msg.getBytes());  
        }  
        System.out.println("消息发送成功");  
  
        // 6. 资源释放  
        //channel.close();  
        //connection.close();    
    }  
}

消费者

代码语言:javascript代码运行次数:0运行复制
package rabbitmq;  
  
import com.rabbitmq.client.*;  
  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
public class ConsumerDemo {  
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  
  
        // 1. 创建连接  
        ConnectionFactory connectionFactory = new ConnectionFactory();  
        connectionFactory.setHost("localhost");  
        connectionFactory.setPort(5672);  
        connectionFactory.setVirtualHost("coding");  
        connectionFactory.setUsername("study");  
        connectionFactory.setPassword("study");  
        Connection connection = connectionFactory.newConnection();  
  
        // 2. 创建 Channel        Channel channel = connection.createChannel();  
  
        // 3. 声明队列  
        channel.queueDeclare("hello", true, false, false, null);  
  
        // 4. 消费消息  
        /**  
         * basicConsume(String var1, boolean var2, Consumer var3)         * 参数说明:  
         * var1(queue): 队列名称  
         * var2(autoAck): 是否自动确认  
         * var3(callback): 接收到消息后,执行的逻辑  
         */  
        DefaultConsumer consumer = new DefaultConsumer(channel){  
            // 从队列中收到消息,就会执行的方法  
            @Override  
            /**  
             * handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)             * 参数说明:  
             * consumerTag: 消费者标签,通常是消费者在订阅队列时指定的  
             * envelope: 包含消息的封包信息,如队列名称,交换机等  
             * properties: 一些配置信息  
             * body: 消息的具体内容  
             */  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
                //TODO  
                System.out.println("接收到消息: " + new String(body));  
            }  
        };  
        channel.basicConsume("hello", true, consumer);  
  
        // 等待程序执行完成  
        Thread.sleep(2000);  
  
        // 5. 释放资源  
        channel.close();  
        connection.close();  
    }  
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2025-04-16,如有侵权请联系 cloudcommunity@tencent 删除连接协议rabbitmqamqp队列

本文标签: RabbitMQAMQP协议生产者和消费者的代码编写