RabbitMQ的队列模式你真的懂吗

cnblogs 2024-09-12 08:13:00 阅读 91

0 前言

官网描述六类工作队列模式:

  1. 简单队列模式:最简单的工作队列,一个消息生产者,一个消息消费者,一个队列。另称点对点模式
  2. 工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者。也称点对点模式
  3. 发布/订阅模式:无选择接收消息,一个消息生产者,一个交换器,多个消息队列,多个消费者
  4. 路由模式:基于发布/订阅模式,有选择的接收消息,即通过 routing 路由进行匹配条件是否满足接收消息
  5. 主题模式:同样是在发布/订阅模式的基础上,根据主题匹配进行筛选是否接收消息,比第四类更灵活
  6. RPC模式:拥有请求/回复的。也就是有响应的,这是其它都没的

1 简单队列模式

1 实现功能

一个生产者 P 发送消息到队列 Q,一个消费者 C 接收:

Pro

Pro负责创建消息队列,并发送消息入列:

  1. 获取连接
  2. 创建通道
  3. 创建队列声明
  4. 发送消息
  5. 关闭队列

<code>public class Producer {

private static final String QUEUE_NAME = "test_queue";

public static void main(String[] args) throws IOException, TimeoutException {

Connection newConnection = MQConnectionUtils.newConnection();

Channel channel = newConnection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

String msg = "我是生产者生成的消息";

System.out.println("生产者发送消息:" + msg);

channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

channel.close();

newConnection.close();

}

}

Con

  1. 获取连接
  2. 获取通道
  3. 监听队列

public class Customer {

private static final String QUEUE_NAME = "test_queue";

public static void main(String[] args) throws IOException, TimeoutException {

System.out.println("002");

Connection newConnection = MQConnectionUtils.newConnection();

Channel channel = newConnection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)

throws IOException {

String msgString = new String(body, "UTF-8");

System.out.println("消费者获取消息:" + msgString);

}

};

channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

}

}

创建vhost

2 工作队列模式

将耗时的任务分发给多个消费者(工作者)。

主要解决:处理资源密集型任务,且还要等他完成。有了工作队列,就可将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可取出消息并完成工作。若启动了多个工作进程,则工作就可在多个进程间共享。

工作队列也称公平性队列模式,循环分发,若有两个消费者,默认RabbitMQ按序将每条消息发给下一个 Con,每个消费者获得相同数量的消息,即轮询。

Pro

创建50个消息

<code>public class Producer2 {

private static final String QUEUE_NAME = "test_queue";

public static void main(String[] args) throws IOException, TimeoutException {

Connection newConnection = MQConnectionUtils.newConnection();

Channel channel = newConnection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

/**保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */

channel.basicQos(1);

for (int i = 1; i <= 50; i++) {

String msg = "生产者消息_" + i;

System.out.println("生产者发送消息:" + msg);

channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

}

channel.close();

newConnection.close();

}

}

Con

public class Customer2_1 {

private static final String QUEUE_NAME = "test_queue";

public static void main(String[] args) throws IOException, TimeoutException {

System.out.println("001");

Connection newConnection = MQConnectionUtils.newConnection();

final Channel channel = newConnection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */

channel.basicQos(1);

DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)

throws IOException {

String msgString = new String(body, "UTF-8");

System.out.println("消费者获取消息:" + msgString);

try {

Thread.sleep(1000);

} catch (Exception e) {

} finally {

channel.basicAck(envelope.getDeliveryTag(), false);

}

}

};

channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

}

}

循环分发

启动生产者

启动两个消费者

Pro发送了50条消息进入队列,而上方消费者启动图里很明显的看到轮询的效果,就是每个消费者会分到相同的队列任务。

公平分发

由于上方模拟的是非常简单的消息队列的消费,假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。如一个1年的程序员,跟一个3年的程序员,分配相同的任务量,明显3年的程序员处理起来更加得心应手,很快就无所事事了,但是3年的程序员拿着非常高的薪资!显然3年的程序员应该承担更多的责任,咋办?

发生上述问题的原因是 RabbitMQ 收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量,类似UDP,面向无连接。可用 basicQos,并将参数 prefetchCount 设为1,告诉 RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样 RabbitMQ 就不会轮流分发了,而是寻找空闲的工作者进行分发。

<code>final Channel channel = newConnection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */

channel.basicQos(1);

消息持久化

背景

上边我们提到的公平分发是由消费者收取消息时确认解决的,但是这里面又会出现被 kill 的情况。

当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。

这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。

但是在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。咋办?

参数配置

参数配置一:生产者创建队列声明时,修改第二个参数为 true

/**3.创建队列声明 */

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

参数配置二:生产者发送消息时,修改第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN

for (int i = 1; i <= 50; i++) {

String msg = "生产者消息_" + i;

System.out.println("生产者发送消息:" + msg);

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

}

小结

  • 循环分发:消费者端在信道上打开消息应答机制,并确保能返回接收消息的确认信息,这样可以保证消费者发生故障也不会丢失消息
  • 消息持久化:服务器端和客户端都要指定队列的持久化和消息的持久化,这样可以保证RabbitMQ重启,队列和消息也不会丢失
  • 公平分发:指定消费者接收的消息个数,避免出现消息均匀推送出现的资源不合理利用的问题

3 发布订阅模式

工作队列模式是直接在生产者与消费者里声明好一个队列,消息就只会对应同类型的消费者。这种只处理同种类型的消息有弊端。

3.1 案例

门户网站,用户注册完后一般都会发送消息通知用户注册结果。如在一个系统中,用户注册信息有邮箱、手机号,在注册完后会向邮箱和手机号都发送注册完成信息。

利用 MQ 实现业务异步处理,若用工作队列,就声明一个注册信息队列。注册完成后生产者向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息。但实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应放在一块处理。

这时就可利用发布/订阅模式将消息发送到转换机(EXCHANGE),声明两个不同的队列(邮箱、手机),并绑定到交换机。这样生产者只需要发布一次消息,两个队列都会接收到消息发给对应的消费者:

只需简单的将队列绑定到交换机。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列。就像子网广播,每台子网内的主机都获得一份复制的消息。

3.2 啥是发布订阅模式

可将消息发送给不同类型的消费者。即发布一次,消费多个:

X表示交换机、红色表示队列。

展示邮件、短信的例子,通过绑定到一个交换机,但是

3.3 实战

<code>public class ProducerFanout {

private static final String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] args) throws IOException, TimeoutException {

/** 1.创建新的连接 */

Connection connection = MQConnectionUtils.newConnection();

/** 2.创建通道 */

Channel channel = connection.createChannel();

/** 3.绑定的交换机 参数1交互机名称 参数2 exchange类型 */

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

/** 4.发送消息 */

for (int i = 0; i < 10; i++)

{

String message = "用户注册消息:" + i;

System.out.println("[send]:" + message);

// 第二个参数为空类似于表示全局广播,只要绑定到该队列上的消费者理论上是都可收到

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));

try {

Thread.sleep(5 * i);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

/** 5.关闭通道、连接 */

channel.close();

connection.close();

/** 注意:如果消费没有绑定交换机和队列,则消息会丢失 */

}

}

邮件消费者

public class ConsumerEmailFanout {

private static final String QUEUE_NAME = "consumerFanout_email";

private static final String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] args) throws IOException, TimeoutException {

System.out.println("邮件消费者启动");

/* 1.创建新的连接 */

Connection connection = MQConnectionUtils.newConnection();

/* 2.创建通道 */

Channel channel = connection.createChannel();

/* 3.消费者关联队列 */

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

/* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */

// 第三个参数置为空时,可以接收到生产者所有的消息(生产者 routingKey 参数为空时)

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

DefaultConsumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)

throws IOException {

String msg = new String(body, "UTF-8");

System.out.println("消费者获取生产者消息:" + msg);

}

};

/* 5.消费者监听队列消息 */

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

短信消费者

public class ConsumerSMSFanout {

private static final String QUEUE_NAME = "ConsumerFanout_sms";

private static final String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] args) throws IOException, TimeoutException {

System.out.println("短信消费者启动");

/* 1.创建新的连接 */

Connection connection = MQConnectionUtils.newConnection();

/* 2.创建通道 */

Channel channel = connection.createChannel();

/* 3.消费者关联队列 */

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

/* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */

// 第三个参数置为空时,可接收到生产者所有的消息(生产者 routingKey 参数为空时)

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

DefaultConsumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)

throws IOException {

String msg = new String(body, "UTF-8");

System.out.println("消费者获取生产者消息:" + msg);

}

};

/* 5.消费者监听队列消息 */

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

运行

先运行两个con,再运行pro。如没有提前将队列绑定到交换机,直接运行pro,消息是不会发到任何队列里的。

生产者

短信消费者

邮件消费者

小结

相比工作模式,发布订阅模式引入了交换机,类型上更灵活。

pro不是直接操作队列,而是将数据发给交换机,由交换机将数据发给与之绑定的队列。从不加特定参数的运行结果中可以看到,两种类型的消费者(email,sms)都收到相同数量消息。

必须声明交换机,并设置模式:channel.exchangeDeclare(EXCHANGE_NAME, "fanout"),fanout 指分发模式(将每一条消息都发送到与交换机绑定的队列)

队列必须绑定交换机:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

生产者发送消息到交换机,多个消费者声明多个队列,与交换机进行绑定,队列中的消息可以被所有消费者消费,类似QQ群消息

4 路由模式

就是发布订阅模式(Publish/Subscribe Pattern)中的直连交换机(Direct Exchange)。一种基于路由键(Routing Key)来路由消息的模式。在这种模式下,生产者发送消息时会指定一个路由键,交换机会根据这个路由键将消息路由到与之匹配的队列。

Pro

使用 <code>channel.basicPublish 方法发送消息,并指定交换机名称和路由键。交换机会根据路由键将消息路由到与之匹配的队列。

Con

在消费者代码中,我们声明了一个直接交换机(direct 类型),并绑定了一个队列。在绑定队列时,我们使用 channel.queueBind 方法,并指定交换机名称、队列名称和路由键。交换机会根据路由键将消息路由到与之匹配的队列。

特点

  • 路由键匹配:消息的路由键必须与队列绑定的路由键完全匹配,才能将消息路由到该队列。
  • 直接交换机:直接交换机根据路由键进行精确匹配,适用于需要精确控制消息路由的场景。

通过这种方式,路由模式可以实现基于路由键的精确消息路由,适用于需要将消息发送到特定队列的场景。

5 主题模式

属于发布订阅模式的TopicExchange(主题交换机)。Queue 通过 routing key 绑定到 TopicExchange,当消息到达TopicExchange后,TopicEkchange 根据消息的 routing key 将消息路由到一个或者多个Queue。

关注我,紧跟本系列专栏文章,咱们下篇再续!

作者简介:魔都架构师,多家大厂后端一线研发经验,在分布式系统设计、数据平台架构和AI应用开发等领域都有丰富实践经验。

各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。

负责:

  • 中央/分销预订系统性能优化
  • 活动&券等营销中台建设
  • 交易平台及数据中台等架构和开发设计
  • 车联网核心平台-物联网连接平台、大数据平台架构设计及优化
  • LLM Agent应用开发
  • 区块链应用开发
  • 大数据开发挖掘经验
  • 推荐系统项目

目前主攻市级软件项目设计、构建服务全社会的应用系统。

参考:

  • 编程严选网

本文由博客一文多发平台 OpenWrite 发布!



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。