Loading... RabbitMQ 是实现了 AMQP 协议的开源消息代理软件(又叫做面向消息的中间件)。<br /> <br />这一节主要介绍 AMQP 协议的一些基本概念,包括:四种交换器(exchange)、队列、绑定等。这些概念在实现了 AMQP 协议的消息中间件中是通用的,RabbitMQ 则在此基础上做了一些扩展。<br /> --- <a name="QAwRh"></a> ## AMQP 概述 <br />AMQP:Advanced Messaging Queue Protocol,高级消息队列协议,是一个允许符合标准的客户端应用与消息中间件代理进行通信的协议。<br />AMQP 是可扩展的,如自定义 exchange 类型、声明 exchange 和 queue 的时候附加属性、服务端特定拓展、新的 AMQP 方法、利用插件扩展等等。 ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1597922560132.png) <br />消息确认:message ackowlodgements,AMQP 引入了消息确认的概念:当消息被发送给消费者时,通知 broker。当使用了消息确认时,broker 仅会在接收到消息确认时完全删除数据。<br />死信队列:如果消息不能被路由,消息可能会返回给 publisher;如果 broker 实现了扩展,那么消息会被放入 `dead letter queue` (死信队列)。publisher 通过使用特定的参数发送消息来选择如何处理这种情况。<br /> <br />AMQP 实体包括:queues、exchanges、bindings<br />AMQP 是可编程的协议。即 AMQP 的实体和路由机制主要由应用自身来定义,而不是 broker 管理员。<br /> --- <a name="P1i6Y"></a> ## 基本概念 <br /> - Broker:即 RabbitMQ Server,消息队列服务器本身 - Virtual Host:虚拟主机。每个 vhost 用于自己的 queue/exchange/binding/权限,默认的 vhost 是 / - Message: 消息,headers + body,其中消息头由一系列的可选属性组成,包括 routing-key、priority、delivery-mode (消息是否需要持久化)等 - Exchange:交换器。根据分发规则,匹配 routing key,将消息发给指定队列 - Queue:队列,用于保存消息,直到消息被发送给消费者。一个消息可以被放到一个或多个队列 - Binding:绑定,定义了交换器和队列之间的路由规则。binding 中包括 routing key。交换器中有一张 binding 的查询表,是 Message 的分发依据 - Connection:网络连接,比如一个 TCP 连接 - Channel:信道,是一个多路复用的双向数据流通道。信道是建立在真实 TCP 连接上的虚拟连接。由于系统建立和销毁 TCP 的开销是很大的,并且很难配置防火墙,所以引入信道来复用一条 TCP 连接。AMQP 的命令都是通过信道发送的,包括发布消息、订阅队列、接收消息等等,所以每个命令会携带一个 channel ID。Channel 作为轻量级的 Connection 极大降低了操作系统建立 TCP 连接的开销。 <a name="Z0NpV"></a> ### exchange 交换器是消息被发送的 AMQP 实体。交换器拿到消息然后把它路由给0或多个队列。路由算法基于交换器的类型和 bindings。<br />AMQP 有四种交换器类型。<br /> | Exchange type | Default pre-declared names | | :--- | :--- | | Direct exchange | (Empty string) and amq.direct | | Fanout exchange | amq.fanout | | Topic exchange | amq.topic | | Headers exchange | amq.match (and amq.headers in RabbitMQ) | <a name="hYWHZ"></a> #### default exchange 默认交换器是一个没有名字的 direct exchange. 每一个自动创建的 queue 会绑定到这个 exchange,默认 routing key = queue name <br /> <a name="EqftB"></a> #### direct exchange 单播,基于 routing key 转发消息到对应的 queue。(也可以用作多播)<br />机制:一个绑定到该交换器的队列,routing key = K;一个 routing key = R 的消息到达该 exchange,当 K = R 时,消息被路由到该队列。<br /> <br />direct exchange 通常被用于在同一个应用程序的多个实例中循环地分配任务。在这种情况下,消息在消费者之间,而不是队列之间,进行负载均衡的。<br /> <br /> ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1597922574786.png) <a name="RdcRn"></a> #### fanout exchange <br />广播。fanout exchange 将消息路由给所有绑定到它的队列,忽略 routing key<br /> ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1597922605181.png) <a name="FnQZz"></a> #### topic exchange 多播。基于消息的 routing key 与绑定到该交换器的队列的 pattern 进行匹配,路由消息到一个或多个队列。常用于复杂的发布/订阅场景。<br />当出现多消费者/应用的场景,消费者选择性地接收消息时,应该考虑使用 topic exchange<br /> <br />在 RabbitMQ 中,pattern 由点隔开,包括两个通配符:# 和 *,其中 # 匹配 0 或多个单词,* 匹配一个单词。<br /> <a name="yEXcl"></a> #### header exchange 使用多个 message header 而不是 routing key 来进行路由。使用 x-match 指定 any 或 all,即匹配一个即可,或全部匹配。<br />可以被用于 direct exchange 但 routing key 不是 string 的情况,比如 integer、hash等<br /> <br /> <a name="inKfH"></a> ### Queue <br />队列与交换器有相同属性,但也有一些额外的属性: - Name、Durable、Exclusive、Auto-delete、Arguments(optional) <br />队列在使用前必须被声明。如果队列还不存在,声明会使队列被创建。如果队列已经存在,且参数相同,那么没有任何影响。如果参数不同,那么会返回一个 channel-level 的异常:code 406, PRECONNECTION_FAILED<br /> <br />假如 broker 中有一个名为 hello 的队列,且 durable = true<br />然后在客户端 declare 一个 name = hello, durable = false 的队列,那么会抛出如下异常:<br /> ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1597922646401.png) <br /> <br />以 `amp.` 开头的队列名是保留内部使用的。<br /> <a name="Obi7T"></a> ### bindings bindings 是交换器用于路由消息到队列的规则。在某些类型的交换器中,binding 会使用 routing key 属性。<br />如果消息没有被路由到任何队列,根据 publisher 设置的属性,消息要么被丢弃,要么被返回。<br /> <br />[Publishers —— Unroutable Message Handling](https://www.rabbitmq.com/publishers.html#unroutable) > 不同协议有不同的处理方式。 > - AMQP 0-9-1 根据 mandatory 消息属性的值来处理不可达消息:mandatory = false (default),消息会被丢弃或重新发布到 alternate exchange 中。mandatory = true,消息被返回给发布者,发布者必须拥有返回消息处理器。 <br />AMQP 中设置 alternate exchange (ae) 的方法有两种:通过 policy 或客户端指定的参数(x-args) 来指定。推荐使用第一种方式。参考:[rabbit-ae](https://www.rabbitmq.com/ae.html)<br />通过 policy 设置 ae,可以使用命令行或者 management 插件。<br /> <a name="Ntqx9"></a> ### Consumers <br />消费者可以通过以下两种方式消费消息: - 订阅 (push api, 推荐使用) - 轮询(pull api,效率极低) <br />同一个队列可能有多个消费者,也可能只有一个独占消费者(exclusive consumer),当独占消费者消费消息时,其他消费者被排除。<br /> <br />每一个消费者有一个 id(字符串),叫做 consumer tag<br /> --- <a name="4mtSS"></a> ### Message Acknowledgements <br />又叫 acks<br />AMQP 提供了两种确认模型: - automatic acknowledge model - 服务器使用 basic.deliver 或 basic.get-ok 发送消息给应用之后,从队列移除消息 - explicit acknowledge model - 应用使用 basic.ack 返回确认消息之后,从队列移除消息 <br />如果消费者死亡,且没有发送确认,那么服务器将重新发送消息给另一个消费者。如果没有可用消费者,服务器将等待,直到有新的消费者注册到相同的 queue 上。<br /> --- <a name="V2bIk"></a> ### Message Attributes And Payload <br />一些属性由服务器使用,更多属性是给消费者使用的。<br />消息通常会有 payload(即携带的数据),也有可能不携带数据。<br />消息可以被设置为持久化,这与 exchange 和 queue 是否为 durable 无关。即消息的持久化仅由它本身决定。<br /> --- <a name="lrHkL"></a> ## 总结 <br />这一节主要是介绍了 RabbitMQ 中的一些基本概念,文章内容主要是来自 RabbitMQ 官网。下一节会介绍如何快速安装 RabbitMQ、以及 Spring Boot 中如何快速集成 RabbitMQ。<br /> <br /> --- 参考:<br /> - [AMQP 0-9-1 Model Explained](https://www.rabbitmq.com/tutorials/amqp-concepts.html) - [一文带你体验RabbitMQ 收发消息](https://juejin.im/post/6856571028496351239) - [消息队列之 RabbitMQ](https://www.jianshu.com/p/79ca08116d57)<br /> Last modification:August 22, 2020 © Allow specification reprint Support Appreciate the author AliPayWeChat Like 0 请作者喝杯肥宅快乐水吧!