现象

RabbitMQ的管控台有几万条消息处于ready状态,还有几百条unacked的消息。

队列阻塞-分析原因

consumer消费处理错误消息失败后没有正常进行ack, 正常的消息也不再被消费, 随即导致队里阻塞

但为什么正常消息也没有被正常消费呢?

其实这是RabbitMQ的一种保护机制。防止当消息激增的时候,海量的消息进入consumer而引发consumer宕机。

RabbitMQ提供了一种QOS(服务质量保证)功能,即在非自动确认的消息的前提下,限制信道上的消费者所能保持的最大未确认的数量。可以通过设置PrefetchCount实现。

举例说明:可以理解为在consumer前面加了一个缓冲容器,容器能容纳最大的消息数量就是PrefetchCount。如果容器没有满RabbitMQ就会将消息投递到容器内,如果满了就不投递了。当consumer对消息进行ack以后就会将此消息移除,从而放入新的消息。

1
2
3
4
5
6
7
8
9
10
listener:
simple:
# 消费端最小并发数
concurrency: 1
# 消费端最大并发数
max-concurrency: 5
# 一次处理的消息数量
prefetch: 2
# 手动应答
acknowledge-mode: manual

prefetch参数就是PrefetchCount

通过上面的配置发现prefetch只配置了2,并且concurrency配置的只有1,所以当发送了2条错误消息以后,由于处理失败这2条消息一直没有被正常ack。将缓冲区沾满了,这个时候RabbitMQ认为这个consumer已经没有消费能力了就不继续给它推送消息了,所以就造成了队列阻塞。

判断队列是否有阻塞的风险

ack模式为manual,并且线上出现了unacked消息,这个时候不用慌。由于QOS是限制信道channel上的消费者所能保持的最大未确认的数量。所以允许出现unacked的数量可以通过channelCount * prefetchCount * 节点数量 得出。

channlCount就是由concurrency,max-concurrency决定的。

  • min = concurrency * prefetch * 节点数量
  • max = max-concurrency * prefetch * 节点数量

由此可以的出结论

  • unacked_msg_count < min 队列不会阻塞。但需要及时处理unacked的消息。
  • unacked_msg_count >= min 可能会出现堵塞。
  • unacked_msg_count >= max 队列一定阻塞。

处理方法

其实处理的方法很简单,将解密和解析的方法放入try catch中就解决了这样不管解密正常与否,消息都会被签收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
try {

// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);

// 模拟推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// 消息签收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}

}

注意的点

unacked的消息在consumer切断连接后(重启),会自动回到队头。

事故重现-磁盘占用飙升

如果将ack模式改成auto自动,这样不管正常与否,消息都会被签收。从而解决阻塞问题.

但这样会使QOS不生效。会出现大量消息涌入consumer从而造成consumer宕机。

例子: 发送1条错误的消息

原因

RabbitMQ消息监听程序异常时,consumer会向rabbitmq server发送Basic.Reject,表示消息拒绝接受,由于Spring默认requeue-rejected配置为true,消息会重新入队,然后rabbitmq server重新投递。就相当于死循环了,所以控制台在疯狂刷错误日志造成磁盘利用率飙升的原因。

解决方法

default-requeue-rejected: false即可。

总结

  • 生产环境不建议使用自动ack,这样会QOS无法生效。
  • 在使用手动ack的时候,需要非常注意消息签收。
  • 其实在将有问题的MQ重置时,是将错误的消息给清除才没有问题了,相当于是消息丢失了。
1
2
3
4
5
6
7
try {
// 业务逻辑。
}catch (Exception e){
// 输出错误日志。
}finally {
// 消息签收。
}