上一节介绍了 RabbitMQ 的基本概念,这一节介绍 RabbitMQ 的安装使用,以及 SpringBoot 中如何整合使用 RabbitMQ,将介绍 direct/fanout/topic 三种类型的交换器的使用,最后介绍死信队列的概念与使用。
本节示例代码已经上传至 GitHub:https://github.com/laolunsi/spring-boot-examples,该仓库内容包括 spring/springboot/springcloud,希望可以对你有所帮助!
下面进入正文。
RabbitMQ 安装
从官网下载安装包:https://www.rabbitmq.com/download.html
以 windows 为例,直接下载 EXE 文件:
运行 EXE 文件,等待安装完成后。可以在 windows 的应用中看到 RabbitMQ Service,启动它。
开启可视化管理插件,找到 RabbitMQ 的安装目录,切换到 sbin 文件夹下,打开命令行,输入:
rabbitmq-plugins enable rabbitmq_management
打开浏览器,输入:http://127.0.0.1:15672,可以看到管理页面了。在这个页面,我们可以手动操作 RabbitMQ,包括新建 exchange,新建队列,设置队列绑定,查看队列信息等等。
注:rabbitmq 默认的账号和密码都是 guest !
下面我们快速的看一下 Spring Boot 中如何整合使用 RabbitMQ
SpringBoot 中 RabbitMQ 基本使用
上一节提到 RabbitMQ 中的 exchange 有四种类型,其中 headers 类型很少使用,这里分别以 direct/fanout/topic 三种 exchange 类型为例,记录 默认/广播/模式匹配 这三种常见场景中 RabbitMQ 的使用。
Spring Boot 提供了快速接入 RabbitMQ 的依赖,我们只要引入这个依赖并添加很少的一些配置就可以了!
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
direct
producer
在配置文件中添加 RabbitMQ 的配置信息:
server:
port: 8701
spring:
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
port: 5672
virtual-host: /
发送消息:
@RestController
@RequestMapping(value = "msg")
public class HelloAction {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value = "direct")
public String sayHello(String msg) {
rabbitTemplate.convertAndSend("demo", msg);
return "消息发送成功";
}
}
这时候 demo 队列可能不存在,我们可以手动在 mq 中的创建,也可以通过 @Bean 来 declare:
@Configuration
public class RabbitConfig {
@Bean
public Queue demo() {
return new Queue("demo", true);
}
}
测试:
consumer
同样创建一个 spring boot 项目,引入 rabbitmq 依赖,并添加相关配置。
利用 @RabbitListener 注解接收数据:
@Component
public class RabbitConsumer {
@RabbitListener(queues = {"demo"})
public void consume(Message message, Channel channel) throws IOException {
System.out.println("接收到消息:" + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
启动项目:
这里用的是 Message 类型,我们可以通过 @Payloa 注解来直接接收 body 数据:
@RabbitListener(queues = {"demo"})
public void consume(@Payload String body, Channel channel) throws IOException {
System.out.println("接收到消息:" + body);
}
Spring Boot RabbitMQ 中可以使用 @RabbitListener 和 @RabbitHandler 两个注解联合来从同样的接口中接收不同参数类型的数据并处理,比如 String、Object 等。
不过个人建议发送数据的时候直接用 String,如果是对象,可以用 JSON 工具将对象转换为字符串。接收数据时再用 JSON 工具将字符串还原为对象。
注意,在这种情况下,我们使用的默认的 direct exchange,默认的 routing key 就是队列名。
如果需要自定义 exchange,那么就要指定 binding。比如新建一个 direct 类型的 exchange,名为 demoex,然后添加一个 binding 到 demo,这个 routing key 是完全匹配的:
fanout
fanout 类型的 exchange 表示广播。
我们这里创建三个队列:pub_demo1, pub_demo2, pub_demo3
然后创建一个 exchange:demoex_fanout
将这三个队列绑定到这个exchange上,不需要 routing_key。这些操作都是通过 management 页面进行的。
发送广播消息:
@GetMapping(value = "broadCast")
public String broadCast(String msg) {
// 广播消息到 demoex_public 这个 exchange 绑定的所有队列
rabbitTemplate.convertAndSend("demoex_fanout", "", msg);
return "广播成功";
}
测试接口,可以看到这三个队列中都有数据了:
topic
topic 的模式匹配包括两个通配符:#和 ,其中 # 匹配 0 或多个单词, 匹配一个单词。
需要注意的是,这里的匹配符是用在 exchange 的 bindling 里的,如下,将 pat_demo1,2,3 绑定到 demoex_topic 这个 topic 类型的 exchange 上:
发送:
@GetMapping(value = "pattern")
public String pattern(String msg) {
rabbitTemplate.convertAndSend("demoex_topic", "demo.a.b.c", msg);
return "模式传播成功";
}
查看 mq 中的 pat_demo1,2,3 队列:
死信队列的使用
思考一个问题,如果某个消息被消费者 nack 或者 reject 怎么办?这些消息是直接丢掉还是返回到原队列中?如果这些不能被消费的消息堆积,超过最大队列长度怎么办?
死信消息与死信队列的概念
首先需要理解一个概念 —— 死信消息,即不能被消费的消息。当一个消息出现以下情况之一时,消息会变成死信消息:
- 消息被否认确定,使用 channel.basicNack 或 channel.basicReject,且设置了 requeue = false
- 消息在队列中的存活时间超过了设置的 TTL
- 消息队列的消息数量超过了最大队列长度
出现死信消息时,要怎么处理呢?
如果消息不重要,那就直接丢掉好了,最多记个日志方便回查。如果消息比较重要呢,最好还是有个专门的程序来处理这些死信消息。在 RabbitMQ 中,可以利用死信队列来处理死信消息。
死信队列是怎么配置的?它又是怎么工作的呢?
非常简单!
死信队列也是普通队列,只是在原队列 A 上配置一下死信队列 D 的参数,在概念上,这个队列 D 就变成了死信队列。
Spring Boot 使用死信队列
可以通过 RabbitMQ 的面板来绑定死信队列,也可以用程序来指定参数。
首先创建一下死信队列需要的交换器和队列,direct exchange 取名 demoex_dead,然后创建队列 demo_dead,绑定到 demoex_dead 这个交换器上。
下面分别以面板和程序来展示如何给 demo 队列绑定死信队列:
使用面板:
使用程序:
@Configuration
public class RabbitConfig {
@Bean
public Queue demo() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "demoex_dead");
args.put("x-dead-letter-routing-key", "demo_dead");
return new Queue("demo", true, false, false, args);
}
}
绑定好死信队列后,我们来测试一下:
@Component
public class RabbitConsumer {
@RabbitListener(queues = {"demo"})
public void consume(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
if (msg.contains("dead")) {
System.out.println("拒绝包含dead的消息:" + msg);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("接收到正常消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@RabbitListener(queues = {"demo_dead"})
public void consumeDead(Message message, Channel channel) throws IOException {
System.out.println("死信队列消息:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
测试,首先推送消息 deadA,发现消息被放到死信队列了。
然后推送消息 da,消息接收正常。
好了,到此为止,我们已经介绍了 RabbitMQ 的基础知识、如何安装 RabbitMQ ,以及 SpringBoot 如何快速接入 RabbitMQ 了。
希望我的文章对你有所帮助!thanks for your reading...
写在最后
看了一下,大概有一个月没更新了,最近比较忙,心情也比较累,困于生活,找不到理想的方向。很难用言语去表达吧,身边也没什么可以倾诉的人,而越是这种糟糕的情况,越导致个人工作学习效率低下,很多时候时候都像是在假装忙碌。
有时候挺惆怅的,感觉看不到方向。有时候做了决定之后,又在怀疑自己的选择是错的,但不选择,就不会有新的机会吧。在漫无边际的荒野里,到底是向目标前进,还是在远离呢?
无论如何,希望我们都能在满是泥泞的日子里,孤独与寂寥的日子里,保持斗志,稳定心态,默默积攒力量,不说什么茧化蝶之类的玄乎话,就简简单单,能够早一点过上自己想要的生活吧!
与君共勉!与君共期!