RabbitMQ第三弹 发送消息与接收消息

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!
上一篇文章 RabbitMQ第二弹 交换器和队列 中介绍了如何使用Java API创建连接、信道,以及通过信道声明交换器与队列,最后将交换器与队列进行绑定,关于第二弹中的这些代码,我在本文中将不再列出,如有疑问请看第二弹。
本文将介绍生产者与消息者,利用Java API实现基本的消息发送与接收。

1、发送消息
发送消息用channel对象的basicPublish方法,向交换器bobo-exchange-a发送10条消息的代码如下。
for (int i = 0; i < 10; i++) {
    channel.basicPublish("bobo-exchange-a","bindingkey-a",null,("hello,world:"+i).getBytes());
}


如果消息正确路由到相应的队列中,则我们在RabbitMQ的管理界面中可以看到这10条消息处于ready状态,如下图所示。

RabbitMQ第三弹 发送消息与接收消息

由于这10条消息存储在队列中,还没有消费者消费它,所以是ready状态。


basicPublish方法的参数如下:

  • exchange:交换器名称,如果为空字符串则消息会被发送到RabbitMQ默认的交换器中。

  • routingKey:路由键。

  • props:消息的基本属性集,包含14个属性成员:contentType、contentEncoding、headers、deliveryMode、priority、correlationid、replyTo、expiration、messageid、timestamp、type、userId、appId、clusterid;可通过如下代码创建props。

new AMQP.BasicProperties.Builder().contentType().contentEncoding().build();
  • body(byte[]):消息体payload。

  • mandatory(强制性的):

  • 设置为true时:当交换器无法根据自身类型和路由键找到一个符合条件的队列时,RabbitMQ会将消息返回给生产者,可通过channel.addReturnListener监听返回的消息,如下代码所示。

channel.addReturnListener((replyCode, replyText, exchange, routingKey, basicProperties, body) -> {
    System.out.println("返回的消息是:"+new String(body));
});

如果不想用ReturnListener,则可以使用备份(备胎)交换器将未被路由的消息存储在RabbitMQ中,将B交换器声明为A交换器的备份交换器如下代码所示。

Map args=new HashMap();
args.put("alternate-exchange","B");
channel.exchangeDeclare("A","direct",args);

备份交换器和普通交换器没区别,不过建议设置为fanout类型方便使用,如果为直连类型,则当路由键与绑定键不匹配时消息会丢失。

  • 设置为false时,消息直接丢弃。

  • immediate立即的:

设置为true时,如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。

RabbitMQ3.0去掉了对immediate参数的支持(如果使用会报异常),RabbitMQ官方解释是:immediate参数会影响队列的性能,增加了代码复杂性,建议采用TTL和DLX的方法替代。


由于我将basicPublish方法的参数讲到比较细,因此提到了一些新的概念比如:备份交换器、TTL(过期时间)、DLX(死信交换器)等,这些概念我会在后面的文章中介绍。

2、接收消息

RabbitMQ消费者有推和拉两种模式,推模式采用consume,拉模式采用get。
2.1、推模式
推模式用channel对象的basicConsume方法,basicConsume方法参数如下:
  • queue:队列名称。

  • autoAck:是否自动确认

当autoAck为false时,RabbitMQ会先给消息打上删除标记,然后等待消费者显式地回复确认信号后才从内存(或磁盘)中移除消息,因此不用担心处理消息过程中消费者进程挂掉后消息丢失的问题。

如果RabbitMQ一直没有收到消费者的确认信号,则有两种情况:

  • 消费者断开连接:RabbitMQ会安排该消息重新进入队列(位置不变,仅仅去除了删除标记),等待投递给下一个消费者;

  • 消费者未断开连接:RabbitMQ会一直等待,不会为未确认的消息设置过期时间;

  • consumerTag:消费者标签,用来区分多个消费者。

  • noLocal:设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者。

  • exclusive:是否排他。

  • arguments:设置消费者的其他参数。

  • callback:设置消费者的回调函数,用来处理RabbitMQ推送过来的消息。


一般我们会将autoAck设为false,然后进行手动确认,避免消息丢失。
确认消息用channel对象的basicAck方法,该方法有两个参数:
  • deliveryTag:可以看做消息的编号。

  • multiple:如果为true,则确认deliveryTag编号之前所有未被当前消费者确认的消息,否则只确认该编号对应的一条消息。


callback参数为com.rabbitmq.client.Consumer类型,因此我们实现Consumer接口或继承com.rabbitmq.client.DefaultConsumer来实现一个callback,如下代码所示。
DefaultConsumer consumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(consumerTag+"收到消息:"+new String(body));
        String routingKey = envelope.getRoutingKey();
        String contentType = properties.getContentType();
        // 确认消息,deliveryTag可以看作消息的编号
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
};
然后,我们可以创建一个消费者进行消费,如下代码所示。
channel.basicConsume("bobo-queue-a",false,"consumer-tag-a",consumer);
运行结果如下所示。
consumer-tag-a收到消息:hello,world:0
consumer-tag-a收到消息:hello,world:1
consumer-tag-a收到消息:hello,world:2
consumer-tag-a收到消息:hello,world:3
consumer-tag-a收到消息:hello,world:4
consumer-tag-a收到消息:hello,world:5
consumer-tag-a收到消息:hello,world:6
consumer-tag-a收到消息:hello,world:7
consumer-tag-a收到消息:hello,world:8
consumer-tag-a收到消息:hello,world:9
2.2、拉模式
拉模式用channel对象的basicGet方法,如下代码所示。
GetResponse response= channel.basicGet("queueName", false);
System.out.println("收到的消息是:"+new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);


basicConsume方法会将信道置为推模式,直到取消队列的订阅为止,在推模式期间,RabbitMQ会不断地推送消息给消费者。
而拉模式只会拉取一条消息,如果只想获取单条消息则使用basicGet方法,切记不要用basicGet结合循环来代替basicConsume,这样会严重影响性能。
2.3、拒绝消息

如果消息不是消费者想要的,那么消费者不会确认消息,而是拒绝消息。


消费者拒绝消息用channel对象的basicReject方法,该方法有两个参数:

  • deliveryTag:消息编号。

  • requeue:如果为true,则RabbitMQ会将这条消息重新存入队列,以便可以发送给下一个订阅的消费者;如果为false,则RabbitMQ立即会把消息从队列中移除。

消费者批量拒绝消息用channel对象的basicNack方法,该方法有两个参数:

  • deliveryTag:消息编号。

  • multiple参数:如果为true,则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息,如果为false,与basicReject方法一样。



RabbitMQ第三弹 发送消息与接收消息

原文始发于微信公众号(初心JAVA):RabbitMQ第三弹 发送消息与接收消息