RabbitMQ第二弹 交换器和队列

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!
在上一篇文章 RabbitMQ第一弹 搭建环境 中,介绍了RabbitMQ环境搭建以及基础知识点,并提到了交换器和队列这两个概念,本文主要介绍RabbitMQ的交换器和队列。
1、RabbitMQ核心概念
先来看一下RabbitMQ的架构图。
RabbitMQ第二弹 交换器和队列


下面介绍RabbitMQ架构的核心概念。

a)Broker

一个RabbitMQ节点。

b)生产者Producer

消息分为消息体和消息头,消息体也称为payload,消息头用来描述这条消息,比如交换器名称和路由键。

c)消费者Consumer

只消费消息体,消息头会在消息路由的过程中被丢弃掉。

d)连接Connection与信道Channel

生产者、消费者与RabbitMQ交互时,需要打开一个TCP连接,紧接着,再在该连接上创建一个AMQP信道,每条信道会被指定一个唯一的ID,信道是虚拟连接。
RabbitMQ使用TCP连接复用,减少性能开销,但由此需要在复用的TCP连接上建立虚拟信道,具体的关系是:多个线程共享一条TCP连接,每个线程有自己的信道,当信道流量不大时,TCP连接复用可以有效节省TCP连接资源,否则,多个线程共用一个TCP连接就会产生性能瓶颈。

e)交换器Exchange

由交换器将消息路由到一个或者多个队列中,如果路由不到,或许会返回给生产者,或许直接丢弃。

f)队列Queue

队列是RabbitMQ内部对象,用于存储消息。这一点和Kafka这种消息中间件相反,Kafka将消息存储在topic(主题)这个逻辑层面,而相对应的队列逻辑只是topic实际存储文件中的位移标识。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息井处理;
RabbitMQ 不支持队列层面的广播消费(广播是交换器层面支持的)。

g)路由键RoutingKey

生产者将消息发给交换器的时候, 一般会指定一个RoutingKey表示消息的路由规则,Routing Key 需要与交换器类型和绑定键( BindingKey )联合使用才能最终生效。

h)绑定Binding 

RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键( BindingKey )。在绑定多个队列到同一个交换器的时候, 这些绑定允许使用相同的BindingKey 。
RoutingKey与BindingKey可以看作同一个东西,绑定时使用绑定键,发送消息时使用路由键;

2、四种交换器类型

交换器的作用是将消息路由到队列中,而不同的交换器类型具有不同的路由规则。RabbitMQ常用的交换器类型有fanout 、direct、topic 、headers 这四种,AMQP 协议里还有另外两种类型:system 和自定义。

a)fanout扇形交换器

  • 将消息路由到所有与该交换器绑定的队列中。

b)direct直连交换器

  • 将消息路由到BindingKey和RoutingKey完全匹配的队列中。

c)topic主题交换器

  • BindingKey和RoutingKey可以使用点号分隔为多个单词。

  • BindingKey可以使用#和*两种特殊字符,#用于匹配一个单词,*用于匹配多个单词(可以是0个)。

d)headers交换器

  • headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配,headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

3、RabbitMQ Java客户端实战

在上面的核心概念中,介绍了RabbitMQ的交换器和队列,以及交换器与队列是存在绑定关系的,下面用RabbitMQ的Java客户端演示一下如何创建交换器、队列以及将交换器和队列绑定起来。


先引入RabbitMQ客户端的Maven依赖。
<dependency>
  <groupld>com.rabbitmq</groupld>
  <artifactld>amqp-client</artifactid>
  <version>5.7.3</version>
</dependency>


我们需要先用客户端API与RabbitMQ服务端建立起一个连接,然后再在该连接上创建一个虚拟信道,获取Connection与Channel的代码如下所示。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQClient {
    private Connection conn;
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.132");
        factory.setPort(5672);
        factory.setUsername("bobo");
        factory.setPassword("ok");
        // VirtualHost相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通;
        // 在RabbitMQ中,权限控制是以vhost为单位的,当创建一个用户时,用户通常会被指派给至少一个vhost,且只能访问被指派的vhost内的队列、交换器和绑定关系等;
        factory.setVirtualHost("/");
        try {
            // 也可以通过uri的方式连接
            // factory.setUri("amqp://bobo:ok@192.168.216.132:5672");
            this.conn = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public Channel getChannel() throws Exception {
        Channel channel = this.conn.createChannel();
        return channel;
    }
    public static void main(String[] args) throws Exception {
        RabbitMQClient mqClient = new RabbitMQClient();
        // 获取channel
        Channel channel = mqClient.getChannel();
        // 关闭连接
        channel.getConnection().close();
    }
}


然后,我们再使用客户端API依次声明交换器、队列以及将交换器与队列绑定起来,如下代码所示。

// 声明交换器
channel.exchangeDeclare("bobo-exchange-a", BuiltinExchangeType.DIRECT,true);
// 声明队列
String queueName = channel.queueDeclare("bobo-queue-a",true,false,false,null).getQueue();
// 将队列和交换器绑定
channel.queueBind(queueName,"bobo-exchange-a","bindingkey-a");


运行上面的代码,然后在RabbitMQ管理界面中可以看到创建好的交换器与队列,如下图所示。

RabbitMQ第二弹 交换器和队列


4、交换器与队列API

上面的代码中channel对象有三个重要的方法:exchangeDeclare、queueDeclare、queueBind,下面学习一下如何使用这些方法。


声明一个交换器使用exchangeDeclare方法,该参数如下:

  • exchange:交换器名称,多次声明同一名称的交换器不会覆盖,而是抛异常;

  • type:交换器类型;

  • durable:是否持久化;持久化可以将交换器存到磁盘;

  • autoDelete:是否自动删除;自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或交换器都与此解绑才会自动删除;

  • internal:是否内置的;生产者无法直接发送消息到内置交换器中,只能通过交换器路由到交换器这种方式;

  • argument:配置其它参数;


exchangeDeclareNoWait方法:

  • 不建议使用;在声明完一个交换器后,由于不需要等待服务器返回,实际服务器还并未完成交换器的创建,那么此时生产者紧接着使用这个交换器,必然会发生异常;

exchangeDeclarePassive方法:

  • 实际不创建交换器,只用于检测交换器是否存在,如果不存在则抛出异常;

exchangeDelete方法:

  • 删除交换器;


声明一个队列使用queueDeclare方法,方法参数如下:

  • queue:队列名称;

  • durable:是否持久化;

  • exclusive:是否排它(排它队列),如果是,则该队列仅对首次声明它的连接可见,并在连接断开时自动删除,即不同connection之间不能共用,但同一connection的不同channel可共用;

  • autoDelete:是否自动删除,自动删除的前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时才会自动删除;

  • arguments:队列的其它参数;

注意,生产者和消费者都能够使用queueDeclare来声明一个队列,但是如果消费者在同一个channel上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将channel置为传输模式之后才能声明队列。

如果queueDeclare方法不传任何参数,则队列的名称自动生成,且该队列为非持久、排它、自动删除的。


queueDeclareNoWait方法:

  • 类似exchangeDeclareNoWait方法;

queueDeclarePassive方法:

  • 类似exchangeDeclarePassive方法;

queueDelete(queueName)方法:

  • 删除队列;

queuePurge方法:

  • 清空队列内容而不删除队列;


最后再看一下绑定与解绑方法。

// 将队列和交换器绑定
channel.queueBind("queueName","exchangeName","bindingkey");
// 将队列和交换器解绑
channel.queueUnbind("queueName","exchangeName","bindingkey");
// 将交换器与交换器绑定
channel.exchangeBind("exchangeName-to","exchangeName-from","bindingkey");



RabbitMQ第二弹 交换器和队列

原文始发于微信公众号(初心JAVA):RabbitMQ第二弹 交换器和队列