Loading... 上一节介绍了 RabbitMQ 的基本概念,这一节介绍 RabbitMQ 的安装使用,以及 SpringBoot 中如何整合使用 RabbitMQ,将介绍 direct/fanout/topic 三种类型的交换器的使用,最后介绍死信队列的概念与使用。<br /> 本节示例代码已经上传至 GitHub:[https://github.com/laolunsi/spring-boot-examples](https://github.com/laolunsi/spring-boot-examples),该仓库内容包括 spring/springboot/springcloud,希望可以对你有所帮助!<br /> 下面进入正文。 --- <a name="K2pvK"></a> ## RabbitMQ 安装 <br />从官网下载安装包:[https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html)<br /> <br />以 windows 为例,直接下载 EXE 文件: ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1599987824553.png) <br />运行 EXE 文件,等待安装完成后。可以在 windows 的应用中看到 RabbitMQ Service,启动它。<br /> ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1599987854463.png) <br />开启可视化管理插件,找到 RabbitMQ 的安装目录,切换到 sbin 文件夹下,打开命令行,输入: ```shell rabbitmq-plugins enable rabbitmq_management ``` ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1599987882311.png) 打开浏览器,输入:http://127.0.0.1:15672,可以看到管理页面了。在这个页面,我们可以手动操作 RabbitMQ,包括新建 exchange,新建队列,设置队列绑定,查看队列信息等等。<br /> <br />注:rabbitmq 默认的账号和密码都是 guest !<br /> ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1599987910183.png) <br />下面我们快速的看一下 Spring Boot 中如何整合使用 RabbitMQ --- <a name="r5mPl"></a> ## SpringBoot 中 RabbitMQ 基本使用 <br />上一节提到 RabbitMQ 中的 exchange 有四种类型,其中 headers 类型很少使用,这里分别以 direct/fanout/topic 三种 exchange 类型为例,记录 默认/广播/模式匹配 这三种常见场景中 RabbitMQ 的使用。 <br /> <br /> <br />Spring Boot 提供了快速接入 RabbitMQ 的依赖,我们只要引入这个依赖并添加很少的一些配置就可以了! ```xml <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> ``` <a name="VlJAT"></a> ### direct <a name="4ycbG"></a> #### producer 在配置文件中添加 RabbitMQ 的配置信息: ```yaml server: port: 8701 spring: rabbitmq: addresses: 127.0.0.1 username: guest password: guest port: 5672 virtual-host: / ``` <br />发送消息: ```java @RestController @RequestMapping(value = "msg") public class HelloAction { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping(value = "direct") public String sayHello(String msg) { rabbitTemplate.convertAndSend("demo", msg); return "消息发送成功"; } } ``` <br />这时候 demo 队列可能不存在,我们可以手动在 mq 中的创建,也可以通过 @Bean 来 declare: ```java @Configuration public class RabbitConfig { @Bean public Queue demo() { return new Queue("demo", true); } } ``` <br />测试:<br /> ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1599987945914.png) --- <a name="DvgbP"></a> #### consumer 同样创建一个 spring boot 项目,引入 rabbitmq 依赖,并添加相关配置。<br />利用 @RabbitListener 注解接收数据: ```java @Component public class RabbitConsumer { @RabbitListener(queues = {"demo"}) public void consume(Message message, Channel channel) throws IOException { System.out.println("接收到消息:" + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } ``` <br />启动项目:<br /> ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1599987973765.png) <br />这里用的是 Message 类型,我们可以通过 @Payloa 注解来直接接收 body 数据:<br /> ```java @RabbitListener(queues = {"demo"}) public void consume(@Payload String body, Channel channel) throws IOException { System.out.println("接收到消息:" + body); } ``` <br />Spring Boot RabbitMQ 中可以使用 @RabbitListener 和 @RabbitHandler 两个注解联合来从同样的接口中接收不同参数类型的数据并处理,比如 String、Object 等。<br />不过个人建议发送数据的时候直接用 String,如果是对象,可以用 JSON 工具将对象转换为字符串。接收数据时再用 JSON 工具将字符串还原为对象。 --- 注意,在这种情况下,我们使用的默认的 direct exchange,默认的 routing key 就是队列名。<br />如果需要自定义 exchange,那么就要指定 binding。比如新建一个 direct 类型的 exchange,名为 demoex,然后添加一个 binding 到 demo,这个 routing key 是完全匹配的: ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1599987997265.png) --- <a name="M4iK1"></a> ### fanout <br />fanout 类型的 exchange 表示广播。<br />我们这里创建三个队列:pub_demo1, pub_demo2, pub_demo3<br />然后创建一个 exchange:demoex_fanout<br />将这三个队列绑定到这个exchange上,不需要 routing_key。这些操作都是通过 management 页面进行的。<br /> ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1599988018343.png) <br />发送广播消息: ```java @GetMapping(value = "broadCast") public String broadCast(String msg) { // 广播消息到 demoex_public 这个 exchange 绑定的所有队列 rabbitTemplate.convertAndSend("demoex_fanout", "", msg); return "广播成功"; } ``` <br />测试接口,可以看到这三个队列中都有数据了: ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1599988037997.png) --- <a name="FbIMe"></a> ### topic <br />topic 的模式匹配包括两个通配符:#和* _,其中 # 匹配 0 或多个单词,_ * 匹配一个单词。<br />需要注意的是,这里的匹配符是用在 exchange 的 bindling 里的,如下,将 pat_demo1,2,3 绑定到 demoex_topic 这个 topic 类型的 exchange 上:<br /> ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1599988056341.png) <br />发送: ```java @GetMapping(value = "pattern") public String pattern(String msg) { rabbitTemplate.convertAndSend("demoex_topic", "demo.a.b.c", msg); return "模式传播成功"; } ``` <br />查看 mq 中的 pat_demo1,2,3 队列:<br /> ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1599988079583.png) --- <a name="q7RH8"></a> ## 死信队列的使用 <br />思考一个问题,如果某个消息被消费者 nack 或者 reject 怎么办?这些消息是直接丢掉还是返回到原队列中?如果这些不能被消费的消息堆积,超过最大队列长度怎么办?<br /> <a name="cuF1O"></a> ### 死信消息与死信队列的概念 <br />首先需要理解一个概念 —— 死信消息,即不能被消费的消息。当一个消息出现以下情况之一时,消息会变成死信消息: - 消息被否认确定,使用 channel.basicNack 或 channel.basicReject,且设置了 requeue = false - 消息在队列中的存活时间超过了设置的 TTL - 消息队列的消息数量超过了最大队列长度 <br />出现死信消息时,要怎么处理呢?<br />如果消息不重要,那就直接丢掉好了,最多记个日志方便回查。如果消息比较重要呢,最好还是有个专门的程序来处理这些死信消息。在 RabbitMQ 中,可以利用死信队列来处理死信消息。<br /> <br />死信队列是怎么配置的?它又是怎么工作的呢?<br />非常简单!<br />死信队列也是普通队列,只是在原队列 A 上配置一下死信队列 D 的参数,在概念上,这个队列 D 就变成了死信队列。<br /> <br /> <a name="95wWg"></a> ### Spring Boot 使用死信队列 可以通过 RabbitMQ 的面板来绑定死信队列,也可以用程序来指定参数。<br /> 首先创建一下死信队列需要的交换器和队列,direct exchange 取名 demoex_dead,然后创建队列 demo_dead,绑定到 demoex_dead 这个交换器上。<br /> 下面分别以面板和程序来展示如何给 demo 队列绑定死信队列:<br /> 使用面板: ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1599988102397.png) <br />使用程序: ```java @Configuration public class RabbitConfig { @Bean public Queue demo() { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "demoex_dead"); args.put("x-dead-letter-routing-key", "demo_dead"); return new Queue("demo", true, false, false, args); } } ``` <br />绑定好死信队列后,我们来测试一下:<br /> ```java @Component public class RabbitConsumer { @RabbitListener(queues = {"demo"}) public void consume(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); if (msg.contains("dead")) { System.out.println("拒绝包含dead的消息:" + msg); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { System.out.println("接收到正常消息:" + msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } @RabbitListener(queues = {"demo_dead"}) public void consumeDead(Message message, Channel channel) throws IOException { System.out.println("死信队列消息:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } ``` 测试,首先推送消息 deadA,发现消息被放到死信队列了。<br />然后推送消息 da,消息接收正常。 ![](http://zfh-public-blog.oss-cn-beijing.aliyuncs.com/image-1599988124932.png) --- 好了,到此为止,我们已经介绍了 RabbitMQ 的基础知识、如何安装 RabbitMQ ,以及 SpringBoot 如何快速接入 RabbitMQ 了。 希望我的文章对你有所帮助!thanks for your reading... --- <a name="apkcp"></a> ## 写在最后 <br />看了一下,大概有一个月没更新了,最近比较忙,心情也比较累,困于生活,找不到理想的方向。很难用言语去表达吧,身边也没什么可以倾诉的人,而越是这种糟糕的情况,越导致个人工作学习效率低下,很多时候时候都像是在假装忙碌。<br /> <br />有时候挺惆怅的,感觉看不到方向。有时候做了决定之后,又在怀疑自己的选择是错的,但不选择,就不会有新的机会吧。在漫无边际的荒野里,到底是向目标前进,还是在远离呢?<br /> <br />无论如何,希望我们都能在满是泥泞的日子里,孤独与寂寥的日子里,保持斗志,稳定心态,默默积攒力量,不说什么茧化蝶之类的玄乎话,就简简单单,能够早一点过上自己想要的生活吧!<br /> <br />与君共勉!与君共期!<br /> Last modification:September 14, 2020 © Allow specification reprint Support Appreciate the author AliPayWeChat Like 0 请作者喝杯肥宅快乐水吧!