RabbitMQ消息监听异常问题探究

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

点击关注公众号,利用碎片时间学习

问题场景

在使用Spring RabbitMQ做消息监听时,如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常。为了更好的描述问题,下面写个简单的例子。

通过访问null对象来引发空指针异常,消息监听处理程序代码清单:

package amqp;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;


@Component
public class FooMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        String messageBody = new String(message.getBody());
        System.out.println(" [x] Received '" + messageBody + "'");
        String nullStr = null;
        nullStr.toString();
    }
}

往消息监听队列发送一条消息,控制台不停打印异常日志:

18:55:32.816 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:32.979 [pool-1-thread-10] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:32.979 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'Hello, world!' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=directQueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-s5myKVHHeP4FbTGIH0hyeA, consumerQueue=directQueue])
 [x] Received 'Hello, world!'
18:55:38.191 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
 at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
 at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
 at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187)
 at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: null
 at amqp.FooMessageListener.onMessage(FooMessageListener.java:16)
 at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:282)
 at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777)
 ... 10 common frames omitted
18:55:38.192 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Rejecting messages (requeue=true)
18:55:38.193 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:38.194 [pool-1-thread-3] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:38.195 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'Hello, world!' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=directQueue, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-s5myKVHHeP4FbTGIH0hyeA, consumerQueue=directQueue])
 [x] Received 'Hello, world!'
18:55:55.226 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
 at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
 at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
 at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187)
 at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95)
 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: null
 at amqp.FooMessageListener.onMessage(FooMessageListener.java:16)
 at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:282)
 at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777)
 ... 10 common frames omitted
18:55:55.226 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Rejecting messages (requeue=true)
18:55:55.227 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:55.229 [pool-1-thread-4] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:55.230 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'Hello, world!' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=directQueue, receivedDelay=null, deliveryTag=3, messageCount=0, consumerTag=amq.ctag-s5myKVHHeP4FbTGIH0hyeA, consumerQueue=directQueue])

问题来了:为什么异常时会一直重复接收这条消息?

抓包验证

消息监听程序异常的过程到底发生了什么?为了一探究竟,笔者使用Wireshark抓包工具来查看消息处理过程。

首先,修改下监听程序,收到字符串exception时产生异常

 @Override
 public void onMessage(Message message) {
     String messageBody = new String(message.getBody());
     System.out.println(" [x] Received '" + messageBody + "'");
     if(messageBody.equals("exception")){
         String nullStr = null;
         nullStr.toString();
     }
 }

1.监听程序正常处理情况

启动监听程序,Wireshark抓包情况:

RabbitMQ消息监听异常问题探究

我们主要关注最后一列,这一列展示了请求的AMQP协议方法信息,AMQP协议方法包含类名+方法名+参数,这一列主要展示了类名和方法名,点击对应行可以查看参数信息。比如上图:

  • Connection.Start:请求服务端开始建立连接
  • Channel.Open:请求服务端建立信道
  • Queue.Declare:声明队列
  • Basic.Consume:开始一个消费者,请求指定队列的消息

AMQP协议方法更详细介绍可以查看官网

然后,通过客户端发送一条消息

RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
template.convertAndSend("ok");

抓包:

RabbitMQ消息监听异常问题探究

分析:

  • Basic.Publish:客户端发送Basic.Publish方法请求,将消息发布到exchange,rabbitmq server会根据路由规则转发到队列中
  • Basic.Deliver:服务端发送Basic.Deliver方法请求,投递消息到监听队列的客户端消费者
  • Basic.Ack:客户端发送Basic.Ack方法请求,告知rabbimq server,消息已接收处理

2.监听程序异常处理情况

通过客户端发送exception字符串,制造异常

RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
template.convertAndSend("exception");

抓包:

RabbitMQ消息监听异常问题探究

分析:

  • Basic.Reject:客户端发送Basic.Reject方法请求,表示无法处理消息,拒绝消息,此时的requeue参数为true,将消息返回原来的队列
  • Basic.Deliver:服务端调用Basic.Deliver方法,和第一次Basic.Deliver方法不同的是,此时的redeliver参数为true,表示重新投递消息到监听队列的消费者

然后这两步会一直重复下去。对于Basic.Reject方法,可以设置requeue参数为false,这样消息无法处理的时候就不会重新入队了,他会根据异常类型选择直接丢弃或加入dead-letter-exchange中。Spring RabbitMQ配置:

<!--配置监听-->
<rabbit:listener-container connection-factory="connectionFactory" requeue-rejected="false">
    <rabbit:listener ref="fooMessageListener" queue-names="directQueue" />
</rabbit:listener-container>

结论

RabbitMQ消息监听程序异常时,消费者会向rabbitmq server发送Basic.Reject,表示消息拒绝接受,由于Spring默认requeue-rejected配置为true,消息会重新入队,然后rabbitmq server重新投递,造成了程序一直异常的情况。所以说了这么多,我们通过rabbitmq监听消息的时候,程序一定要添加try…catch语句!!!当然你也可以根据实际情况,选择设置requeue-rejected为false来丢弃消息。

参考

  • AMQP协议方法
  • Wireshark抓包教程
  • http://blog.csdn.net/u013256816/article/details/55515234
  • http://fengchj.com/?p=2234
  • https://yemengying.com/2017/01/30/how-does-rabbitmq-handle-exception/

来源:blog.csdn.net/u014513883/article/details/77907898

推荐:

主流Java进阶技术(学习资料分享)

RabbitMQ消息监听异常问题探究
PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。“在看”支持我们吧!

原文始发于微信公众号(Java笔记虾):RabbitMQ消息监听异常问题探究