RabbitMQ第三弹 发送消息与接收消息
for (int i = 0; i < 10; i++) {
channel.basicPublish("bobo-exchange-a","bindingkey-a",null,("hello,world:"+i).getBytes());
}
如果消息正确路由到相应的队列中,则我们在RabbitMQ的管理界面中可以看到这10条消息处于ready状态,如下图所示。
由于这10条消息存储在队列中,还没有消费者消费它,所以是ready状态。
basicPublish方法的参数如下:
-
exchange:交换器名称,如果为空字符串则消息会被发送到RabbitMQ默认的交换器中。
-
routingKey:路由键。
-
props:消息的基本属性集,包含14个属性成员:contentType、contentEncoding、headers、deliveryMode、priority、correlationid、replyTo、expiration、messageid、timestamp、type、userId、appId、clusterid;可通过如下代码创建props。
new AMQP.BasicProperties.Builder().contentType().contentEncoding().build();
-
body(byte[]):消息体payload。
-
mandatory(强制性的):
-
设置为true时:当交换器无法根据自身类型和路由键找到一个符合条件的队列时,RabbitMQ会将消息返回给生产者,可通过channel.addReturnListener监听返回的消息,如下代码所示。
channel.addReturnListener((replyCode, replyText, exchange, routingKey, basicProperties, body) -> {
System.out.println("返回的消息是:"+new String(body));
});
如果不想用ReturnListener,则可以使用备份(备胎)交换器将未被路由的消息存储在RabbitMQ中,将B交换器声明为A交换器的备份交换器如下代码所示。
Map args=new HashMap();
args.put("alternate-exchange","B");
channel.exchangeDeclare("A","direct",args);
备份交换器和普通交换器没区别,不过建议设置为fanout类型方便使用,如果为直连类型,则当路由键与绑定键不匹配时消息会丢失。
-
设置为false时,消息直接丢弃。
-
immediate立即的:
设置为true时,如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
RabbitMQ3.0去掉了对immediate参数的支持(如果使用会报异常),RabbitMQ官方解释是:immediate参数会影响队列的性能,增加了代码复杂性,建议采用TTL和DLX的方法替代。
由于我将basicPublish方法的参数讲到比较细,因此提到了一些新的概念比如:备份交换器、TTL(过期时间)、DLX(死信交换器)等,这些概念我会在后面的文章中介绍。
2、接收消息
-
queue:队列名称。
-
autoAck:是否自动确认
当autoAck为false时,RabbitMQ会先给消息打上删除标记,然后等待消费者显式地回复确认信号后才从内存(或磁盘)中移除消息,因此不用担心处理消息过程中消费者进程挂掉后消息丢失的问题。
如果RabbitMQ一直没有收到消费者的确认信号,则有两种情况:
-
消费者断开连接:RabbitMQ会安排该消息重新进入队列(位置不变,仅仅去除了删除标记),等待投递给下一个消费者;
-
消费者未断开连接:RabbitMQ会一直等待,不会为未确认的消息设置过期时间;
-
consumerTag:消费者标签,用来区分多个消费者。
-
noLocal:设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者。
-
exclusive:是否排他。
-
arguments:设置消费者的其他参数。
-
callback:设置消费者的回调函数,用来处理RabbitMQ推送过来的消息。
-
deliveryTag:可以看做消息的编号。
-
multiple:如果为true,则确认deliveryTag编号之前所有未被当前消费者确认的消息,否则只确认该编号对应的一条消息。
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag+"收到消息:"+new String(body));
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
// 确认消息,deliveryTag可以看作消息的编号
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("bobo-queue-a",false,"consumer-tag-a",consumer);
consumer-tag-a收到消息:hello,world:0
consumer-tag-a收到消息:hello,world:1
consumer-tag-a收到消息:hello,world:2
consumer-tag-a收到消息:hello,world:3
consumer-tag-a收到消息:hello,world:4
consumer-tag-a收到消息:hello,world:5
consumer-tag-a收到消息:hello,world:6
consumer-tag-a收到消息:hello,world:7
consumer-tag-a收到消息:hello,world:8
consumer-tag-a收到消息:hello,world:9
GetResponse response= channel.basicGet("queueName", false);
System.out.println("收到的消息是:"+new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
如果消息不是消费者想要的,那么消费者不会确认消息,而是拒绝消息。
消费者拒绝消息用channel对象的basicReject方法,该方法有两个参数:
-
deliveryTag:消息编号。
-
requeue:如果为true,则RabbitMQ会将这条消息重新存入队列,以便可以发送给下一个订阅的消费者;如果为false,则RabbitMQ立即会把消息从队列中移除。
消费者批量拒绝消息用channel对象的basicNack方法,该方法有两个参数:
-
deliveryTag:消息编号。
-
multiple参数:如果为true,则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息,如果为false,与basicReject方法一样。
原文始发于微信公众号(初心JAVA):RabbitMQ第三弹 发送消息与接收消息