RabbitMQ第四弹 死信队列

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!
本文介绍RabbitMQ的一些特性,主要有:过期时间、死信队列、延迟队列和优先级队列。
1、过期时间

RabbitMQ可以对消息和队列设置过期时间。

1.1、设置队列的TTL

如果对一个队列设置了过期时间,那么在队列既没有任何的消费者,又没有重新声明队列的情况下,超过过期时间时该队列就会被删除。RabbitMQ重启时,持久化的队列的过期时间会被重新计算。


下面声明一个过期时间为30分钟的队列。

Map<String,Object> args =new HashMap<String,Object>() ;
args.put("x-expires",1800000);
channel.queueDeclare(queueName,durable,exclusive,autoDelete,args);

1.2、设置消息的TTL

方式1:设置队列的TTL,那么队列中的所有消息具有同样的TTL。

方式2:生产者发送消息时对消息本身设置TTL,如下代码所示。

channel.basicPublish("bobo-exchange-a","bindingkey-a",true,false,new AMQP.BasicProperties.Builder().expiration("30000").build(),("hello,world:").getBytes());


消息过期删除时机:

  • 对于方式1,一旦消息过期,就会立马从队列中抹去,这是因为队列中所有消息的TTL都一样,那么越早进入队列(队头)消息就越可能过期,因此只需要从队头开始扫描即可找到过期消息。

  • 对于方式2,消息过期不会马上从队列中抹去,每条消息是否过期是在即将投递到消费者之前判定的(惰性删除),如果不是惰性删除,由于队列中消息的TTL不一致,则需要顺序扫描一遍队列才能找出所有的过期消息,影响性能。


如果消息的TTL为0,除非此时消息可以立马投递给消费者,否则会丢失。这个特性可以用来替代immediate参数。

如果方式1和方式2一起用,则以TTL短的那个为准。

当消息过期,就会变成死信,无法再被消费者消费。

2、DLX与死信队列

DLX,全称为Dead-Letter-Exchange,也叫死信交换器或死信邮箱。

消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX 的队列就称之为死信队列。


消息变成死信的几种情况:

  • 消息被拒绝,井且设置requeue参数为false;

  • 消息过期(如果是惰性的则不会自动进行DLX,而是等消费到这个过期消息时才路由到DLX);

  • 队列达到最大长度;


DLX是一个普通的交换器,可以被任何队列指定,当该队列存在死信时,就会自动路由给DLX,进而再路由给死信队列。通过如下代码指定DLX。

//创建DLX
channel.exchangeDeclare("dlx_exchange","direct");
Map<String, Object> args = new HashMap();
args.put("x-dead-letter-exchange","dlx_exchange");

//可以为这个DLX指定路由键,如果没有特殊指定,则使用原队列的路由键
args.put("x-dead-letter-routing-key","dlx-routing-key");

//为队列myqueue添加DLX
channel.queueDeclare("myqueue",false,false,false,args);


对于RabbitMQ来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者由于某种原因拒绝消息 )而被置入死信队列中的情况,后续可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。

3、延迟队列

利用TTL和DLX,可以实现延迟队列。

假设TTL为10秒,那当消息过了10秒后还没有被消费,就会进入DLX,进而进入死信队列。这里的死信队列就相当于延迟时间为10秒的延迟队列。

4、优先级队列

在声明队列时可以传入x-max-priority参数设置队列的最大优先级(最小优先级默认为0),该队列就成了一个优先级队列。

生产者发送消息时,可以指定消息的优先级,但优先级只有当队列中的消息有堆积才有意义,如果消息能马上被消费,那么即使它的优先级是最低的也能立马被消费掉而不是等高优先级的消息在它之前消费。

优先级队列相关代码如下所示。

Map<String, Object> args = new HashMap();
// 最大优先级设置为10,最低优先级默认为0。
args.put("x-max-priority", 10);
channel.queueDeclare("queue.priority",true,false,false,args) ;

// 生产者发消息时指定消息的优先级
channel.basicPublish("exchange_priority","queue.priority",new AMQP.BasicProperties.Builder().priority(5).build(),"message".getBytes());


RabbitMQ第四弹 死信队列

原文始发于微信公众号(初心JAVA):RabbitMQ第四弹 死信队列