上一节介绍了 RabbitMQ 的基本概念,这一节介绍 RabbitMQ 的安装使用,以及 SpringBoot 中如何整合使用 RabbitMQ,将介绍 direct/fanout/topic 三种类型的交换器的使用,最后介绍死信队列的概念与使用。

本节示例代码已经上传至 GitHub:https://github.com/laolunsi/spring-boot-examples,该仓库内容包括 spring/springboot/springcloud,希望可以对你有所帮助!

下面进入正文。


RabbitMQ 安装


从官网下载安装包:https://www.rabbitmq.com/download.html


以 windows 为例,直接下载 EXE 文件:


运行 EXE 文件,等待安装完成后。可以在 windows 的应用中看到 RabbitMQ Service,启动它。


开启可视化管理插件,找到 RabbitMQ 的安装目录,切换到 sbin 文件夹下,打开命令行,输入:

rabbitmq-plugins enable rabbitmq_management

打开浏览器,输入:http://127.0.0.1:15672,可以看到管理页面了。在这个页面,我们可以手动操作 RabbitMQ,包括新建 exchange,新建队列,设置队列绑定,查看队列信息等等。


注:rabbitmq 默认的账号和密码都是 guest !


下面我们快速的看一下 Spring Boot 中如何整合使用 RabbitMQ


SpringBoot 中 RabbitMQ 基本使用


上一节提到 RabbitMQ 中的 exchange 有四种类型,其中 headers 类型很少使用,这里分别以 direct/fanout/topic 三种 exchange 类型为例,记录 默认/广播/模式匹配 这三种常见场景中 RabbitMQ 的使用。  




Spring Boot 提供了快速接入 RabbitMQ 的依赖,我们只要引入这个依赖并添加很少的一些配置就可以了!

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

direct

producer

在配置文件中添加 RabbitMQ 的配置信息:

server:
  port: 8701
spring:
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
    port: 5672
    virtual-host: /


发送消息:

@RestController
@RequestMapping(value = "msg")
public class HelloAction {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value = "direct")
    public String sayHello(String msg) {
        rabbitTemplate.convertAndSend("demo", msg);
        return "消息发送成功";
    }
}


这时候 demo 队列可能不存在,我们可以手动在 mq 中的创建,也可以通过 @Bean 来 declare:

@Configuration
public class RabbitConfig {
    @Bean
    public Queue demo() {
        return new Queue("demo", true);
    }
}


测试:


consumer

同样创建一个 spring boot 项目,引入 rabbitmq 依赖,并添加相关配置。
利用 @RabbitListener 注解接收数据:

@Component
public class RabbitConsumer {

    @RabbitListener(queues = {"demo"})
    public void consume(Message message, Channel channel) throws IOException {
        System.out.println("接收到消息:" + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}


启动项目:


这里用的是 Message 类型,我们可以通过 @Payloa 注解来直接接收 body 数据:

    @RabbitListener(queues = {"demo"})
    public void consume(@Payload String body, Channel channel) throws IOException {
        System.out.println("接收到消息:" + body);
    }


Spring Boot RabbitMQ 中可以使用 @RabbitListener 和 @RabbitHandler 两个注解联合来从同样的接口中接收不同参数类型的数据并处理,比如 String、Object 等。
不过个人建议发送数据的时候直接用 String,如果是对象,可以用 JSON 工具将对象转换为字符串。接收数据时再用 JSON 工具将字符串还原为对象。


注意,在这种情况下,我们使用的默认的 direct exchange,默认的 routing key 就是队列名。
如果需要自定义 exchange,那么就要指定 binding。比如新建一个 direct 类型的 exchange,名为 demoex,然后添加一个 binding 到 demo,这个 routing key 是完全匹配的:


fanout


fanout 类型的 exchange 表示广播。
我们这里创建三个队列:pub_demo1, pub_demo2, pub_demo3
然后创建一个 exchange:demoex_fanout
将这三个队列绑定到这个exchange上,不需要 routing_key。这些操作都是通过 management 页面进行的。


发送广播消息:

    @GetMapping(value = "broadCast")
    public String broadCast(String msg) {
        // 广播消息到 demoex_public 这个 exchange 绑定的所有队列
        rabbitTemplate.convertAndSend("demoex_fanout", "", msg);
        return "广播成功";
    }


测试接口,可以看到这三个队列中都有数据了:


topic


topic 的模式匹配包括两个通配符:#和 ,其中 # 匹配 0 或多个单词, 匹配一个单词。
需要注意的是,这里的匹配符是用在 exchange 的 bindling 里的,如下,将 pat_demo1,2,3 绑定到 demoex_topic 这个 topic 类型的 exchange 上:


发送:

    @GetMapping(value = "pattern")
    public String pattern(String msg) {
        rabbitTemplate.convertAndSend("demoex_topic", "demo.a.b.c", msg);
        return "模式传播成功";
    }


查看 mq 中的 pat_demo1,2,3 队列:


死信队列的使用


思考一个问题,如果某个消息被消费者 nack 或者 reject 怎么办?这些消息是直接丢掉还是返回到原队列中?如果这些不能被消费的消息堆积,超过最大队列长度怎么办?

死信消息与死信队列的概念


首先需要理解一个概念 —— 死信消息,即不能被消费的消息。当一个消息出现以下情况之一时,消息会变成死信消息:

  • 消息被否认确定,使用 channel.basicNack 或 channel.basicReject,且设置了 requeue = false
  • 消息在队列中的存活时间超过了设置的 TTL 
  • 消息队列的消息数量超过了最大队列长度


出现死信消息时,要怎么处理呢?
如果消息不重要,那就直接丢掉好了,最多记个日志方便回查。如果消息比较重要呢,最好还是有个专门的程序来处理这些死信消息。在 RabbitMQ 中,可以利用死信队列来处理死信消息。


死信队列是怎么配置的?它又是怎么工作的呢?
非常简单!
死信队列也是普通队列,只是在原队列 A 上配置一下死信队列 D 的参数,在概念上,这个队列 D 就变成了死信队列。


Spring Boot 使用死信队列

可以通过 RabbitMQ 的面板来绑定死信队列,也可以用程序来指定参数。

首先创建一下死信队列需要的交换器和队列,direct exchange 取名 demoex_dead,然后创建队列 demo_dead,绑定到 demoex_dead 这个交换器上。

下面分别以面板和程序来展示如何给 demo 队列绑定死信队列:

使用面板:


使用程序:

@Configuration
public class RabbitConfig {
    
    @Bean
    public Queue demo() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "demoex_dead");
        args.put("x-dead-letter-routing-key", "demo_dead");
        return new Queue("demo", true, false, false, args);
    }
}


绑定好死信队列后,我们来测试一下:

@Component
public class RabbitConsumer {

    @RabbitListener(queues = {"demo"})
    public void consume(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        if (msg.contains("dead")) {
            System.out.println("拒绝包含dead的消息:" + msg);
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            System.out.println("接收到正常消息:" + msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    @RabbitListener(queues = {"demo_dead"})
    public void consumeDead(Message message, Channel channel) throws IOException {
        System.out.println("死信队列消息:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

测试,首先推送消息 deadA,发现消息被放到死信队列了。
然后推送消息 da,消息接收正常。


好了,到此为止,我们已经介绍了 RabbitMQ 的基础知识、如何安装 RabbitMQ ,以及 SpringBoot 如何快速接入 RabbitMQ 了。
希望我的文章对你有所帮助!thanks for your reading...


写在最后


看了一下,大概有一个月没更新了,最近比较忙,心情也比较累,困于生活,找不到理想的方向。很难用言语去表达吧,身边也没什么可以倾诉的人,而越是这种糟糕的情况,越导致个人工作学习效率低下,很多时候时候都像是在假装忙碌。


有时候挺惆怅的,感觉看不到方向。有时候做了决定之后,又在怀疑自己的选择是错的,但不选择,就不会有新的机会吧。在漫无边际的荒野里,到底是向目标前进,还是在远离呢?


无论如何,希望我们都能在满是泥泞的日子里,孤独与寂寥的日子里,保持斗志,稳定心态,默默积攒力量,不说什么茧化蝶之类的玄乎话,就简简单单,能够早一点过上自己想要的生活吧!


与君共勉!与君共期!

Last modification:September 14th, 2020 at 04:15 pm
请作者喝杯肥宅快乐水吧!