跳到主要内容

消费端限流

​ 消费者宕机过程中MQ上囤积大量消息,重启消费者服务后消息瞬间涌入,造成消息消费服务压力剧增,因此大流量下消息消费端需要进行限流设置 在非自动确认消息的前提下,如果一定数量的消息(基于Consumer和Channel设置QOS的值)没有被确认,将不进行消费新的消息,具体API与《轮训分发-不公平分发》类似,原理类似令牌桶算法

死信队列

由于特定的原因导致 Queue 中的某些消息无法被消费,这类消费异常的数据将会保存在死信队列中防止消息丢失,例如用户在商城下单成功并点击支付后,在指定时间未支付时的订单自动失效

死信队列只不过是绑定在死信交换机上的队列。死信交换机只不过是用来接受死信的交换机,可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。

来源:

TTL(存活时间)过期

队列积压消息达到最大长度,后续消息无法再添加到队列中

消息被拒(消费方返回 basicNack 进行否定应答)且不放回队列中(requeue=false)

TTL超时时间案例:

意为消息具有过期时间限制

在生产者方进行指定为当前发送消息的过期时间,缺点是消息即使过期也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的( 如果当前队列有严重的消息积压情况,已过期的消息依旧会被积压在队列中,如果队列**配置了消息积压上限,**将导致后续应当正常消费的消息全部进入死信队列 )

在队列指定为所有到达该队列的消息的过期时间,时间从消息入队列开始计算,只要超过了队列的超时时间配置,消息会自动清除

该案例为在生产者方进行指定,队列指定TTL过期时间案例参考**《延时队列》或消费者注释代码**

延时队列

方案一:基于死信队列中的消息TTL过期模式的进行改造,不监听对应队列,使消息过期后全部进入死信队列以达成延时效果,主要有队列TTL和消息TTL两种

方案二:使用延时队列插件,让交换机管理消息延时时间(常用)

使用场景:

订单在十分钟之内未支付则自动取消

用户注册成功后,如果三天内没有登陆则进行短信提醒。

用户发起退款,如果三天内没有得到处理则通知相关店铺人员

预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

推送某些数据的定时任务

消息优先级

正常情况下,消费者将会按照消息进入队列顺序对消息进行消费,如果需要使排在后面的某些特定消息先进行消费,需要对队列和消息设置优先级,没有设置优先级的消息依旧按照进入队列的顺序消费,消费者需要在消息进入队列排序完成后消费才能体现优先级 优先级范围为 0~255,值越高优先级越高,且消息优先级不能超过队列优先级

不可路由消息处理

​ RabbitMQ原生API中提供了 ReturnListener 对不可路由消息进行监听,在SpringBoot中,RabbitTemplate对原本的 ReturnListener 进行细分,划分出 ConfirmCallback 和 ReturnCallback 两种消息回调方式对不可路由消息进行处理

ConfirmCallback(交换机回调):

只确认是否正确到达 Exchange 中,无论消息是否成功发送到交换机都会触发

同一条消息的 correlationData 的 ID 必须保证一致以便后续消息补偿,默认为UUID,具体可参考《外卖项目实战》

备份交换机:

ReturnCallBack获取到的消息没有机会进入到队列,因此无法使用死信队列来保存消息,应当使用备份交换机方案进行处理,使用备份交换机后,ReturnCallBack方法将会被覆盖

备用交换机用于**处理无法被正确路由到队列的消息(没有匹配的路由key,目标交换机没有绑定队列)****,**当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中(使用其他类型交换机需要指定路由key,不指定时)

然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。还可以建立一个报警队列,用独立的消费者来进行监测和报警

案例说明:

实现 ConfirmCallbackReturnCallBack 方法,模拟三种不同情况的消息发送,由于备份交换机只处理ReturnCallBack获取到的消息,因此 warning conusmer 应当只输出一条消息