Skip to main content

rabbitmq消息处理

消息接收确认(应答方式):

消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 已经处理了,RabbitMQ 就可以把该消息删除,主要分为三种形式进行消息应答:

手动应答可以进行批量处理(减少网络拥堵),但批量处理过程中出现异常将可能导致该批量处理中的所有消息丢失,因此实际开发中不应启用批量处理功能

否认处理还可参考《死信队列》章节的消息否定应答案例,且一般不开启重回队列功能,如果重回队列将排在队列末尾

类型代码说明
自动应答basicConsume中 autoAck 设置为 true如果应答后的代码出现异常导致回滚,该消息由于已被消费无法找回
手动应答Channel.basicAck确认处理消息
手动应答Channel.basicNack否认处理消息(支持批量)
手动应答Channel.basicReject否认处理消息(不支持批量)如果队列配置了死信交换机将会发送到死信队列中,未配置则进行丢弃操作

重新入队:

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样即使某个消费者偶尔死亡,也可以确保不会丢失任何消息

持久化:

队列持久化:

RabbitMQ 突发崩溃的时候,如果队列没有做持久化将会导致队列和队列内消息丢失。在队列创建时指定第二哥参数为 true 即可 . 独占队列(设置是否只能有一个消费者使用).autoelete 是否在消费完成后是否自动删除队列

临时队列:

当这个队列上最后一个消费者断开连接才会执行删除,一般为独占队列

如果队列没有被持久化,在RabbitMQ重启后将被销毁

消息持久化:

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是

这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强。最好的方式为MQ集群+发布确认+消息存入Redis(开启AOF)保证100%不丢失

不公平分发:

比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,采用默认轮训分发的方式就使得处理速度快的消费者大部分时间处于空闲状态,而处理慢的那个消费者一直在干活,因此实际开发中应该改为不公平分发,让处理速度较快的消费者能够多处理数据

预取值:

消费者最多能积压在信道的待处理消息就是预取值

例如有7条消息,消费者C2的预取值是5,意为最多有5个消息可以被积压在信道中待处理,消费者C1的预取值是2,意为最多有2个消息可以被积压在信道中待处理

在RabbitMQ管理页中可能会遇到 积压数 = 预取值 +1 后迅速复原为相等的情况,原因是恰好消息被MQ应答的瞬间又进入了一条消息,由于统计的速度>应答的速度,因此造成+1

消息发送确认(确认消息):

消息已经正确到达目的队列后将通知发送给生产者,该操作叫 Confirm确认消息,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,可看做是消息已持久化的确认,该操作默认关闭,需要手动开启

单确认模式:

只有当消息被确认发布,后续的消息才能继续发布。缺点是运行效率低,每秒只能确认百条消息

批量确认模式:

使多个消息一起进行确认,相比单确认可以缩短时间。缺点是当某批确认发生故障导致发布出现问题时,将无法得知具体是哪个消息出现问题

异步处理模式:

生产者将消息转换为Map(key为消息标记,value为消息内容)的发送到Channel中。MQ对消息持久化后, broker会通知生产者消息是否确认,生产者只需要通过监听器确认消息确认情况即可

确认模式为单条确认或者批量确认具体选择哪种由MQ自行判断,使用该方案需要对确认回调函数进行详细设计

由于消息发布失败没有兜底方法,无法得知具体为哪个消息发布失败,需要添加JUC容器或引用NoSQL数据库对所有消息进行保存,在消息发布确认函数的回调逻辑中剔除成功的数据即可

这里以 ConcurrentSkipListMap 为例,也可以使用 Redis 的Zset形式存储数据(Score为序号,value为消息)

发布/订阅

同一消息被多为消费者获取需要使用发布订阅模式,交换机通过消息的RoutingKey来判断需要将消息转发到哪些绑定队列上

交换机:

实际情况中,生产者生产的消息从不会直接发送到队列,只会先发送到交换机中,再由交换机通过RoutingKey将消息发送到对应的绑定队列上

交换机的类型包括:直接(direct)、主题(topic) 、标题(headers) 、扇出(fanout),其中 headers 类型已基本弃用

通常情况下生产者产生消息后才会执行消费者代码,因此交换机的创建都由生产者执行,消费者不需要再次创建交换机(交换机只需要被声明一次即可)

在发布订阅模式中,只要有任意一个消费者的代码中进行了其他消费者交换机和 routingKey 的绑定,则这些消费者可以直接进行消息消费,而不再需要再次声明交换机和队列绑定

如果整合SpringBoot,所有的交换机、队列声明将会放在一个配置文件,*无需像非SpringBoot案例中如此麻烦*

在《轮训分发》章节中虽然发送消息没有使用任何交换机,但实际上底层走的是 AMQP default 交换机进行消息发送

Fanout模式:

又称广播模式,生产者将消息发送到交换机,消费者将队列绑定到交换机来获取信息,该模式下指定RoutingKey 无意义

Direct模式:

生产消息时指定 routingKey ,交换机根据不同的 routingKey 将消息进行转发到对应的 channel 上供不同的消费者进行消费

Topic模型:

相当于Direct模式添加了模糊匹配,可以让消费者在队列绑定时对 Routingkey 使用通配符,Routingkey 一般都是由一个或多个单词组成,多个单词之间以**“.”**分割

如果一个消息满足同一队列的多个绑定,只会被消费一次

当一个队列绑定键是#,那么这个队列将接收所有数据,就有点类似 fanout 了;如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

匹配规则:

# 统配符
* (star) can substitute for exactly one word. 匹配不多不少恰好1个词
# (hash) can substitute for zero or more words. 匹配一个或多个词
# 如:
orange.# 匹配 orange 开头的词
audit.* 只能匹配 audit 后面只有一个词语的
#.audit.# 匹配中间夹杂audit的词语

案例一:

如果消费者Q1绑定的 RoutingKey 为 *.orange.* ,消费者Q2绑定的 RoutingKey 为 ***.*.rabbit 和 lazy.#,**那么各自能接受到的对应消息如下图所示

Routingkey结果
quick.orange.rabbit被队列 Q1Q2 接收到
lazy.orange.elephant被队列 Q1Q2 接收到
quick.orange.fox被队列 Q1 接收到
lazy.brown.fox被队列 Q2 接收到
lazy.pink.rabbit虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit是四个单词但匹配 Q2

案例2:

image.png

交换机持久化:

交换机创建默认没有进行持久化设置,在服务器重启之后,交换机会消失。

Return消息机制:

在发送消息的时候,当前的 exchange、queue 不存在或者指定的路由Key路由不到,此时需要使用 ReturnListener 对这类不可路由消息进行捕获,捕获后可将消息记录到日志表、存储到Redis进行后期定时推送等其他操作