转载

RabbitMQ之springboot版详解

前言

花了一周多的时间(周末去掉..捂脸)在工作之余写了两篇关于rabbitMq的内容,一篇是原生版的,一篇是springboot版的。初学者最好是看一下原声版更清晰一点,如果急于应用也可以直接看本文。本文内容较多,看完了五大消息模型的应用后还有进阶篇连着在一起,研究不太彻底请多多指教,好了,不打扰你们看了!

原生版传送门

rabbitMQ-springboot版

官方参考文档

核心基础概念

Server:又称之为Broker,接受客户端的连接,实现AMQP实体服务。

Connection:连接,应用程序与Broker的网络连接。

Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务。如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

Message:消息,服务器和应用程序之间传送的数据,由Message Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性,Body就是消息体内容。

Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或者Queue。

Exchange:交换机,只有转发能力不具备存储消息能力,根据路由键转发消息到绑定的队列。

Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key。

Routing key:一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。

Queue:也可以称之为Message Queue(消息队列),保存消息并将它们转发到消费者。

引入依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
        </dependency>
    </dependencies>
复制代码

配置

spring:
  rabbitmq:
    host: 127.0.0.1
    username: admin123
    password: 123456
    virtual-host: /test

复制代码

消息模型代码示例

关于一些方法的使用,参数属性说明都在代码中有注释

简单队列模型

示例图

RabbitMQ之springboot版详解

P(producer/ publisher):生产者,如寄快递

C(consumer):消费者,如收快递

红色区域:队列,如快递区,等待消费者拿快递

一句话总结

生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

初始化队列

package com.ao.springbootamqp.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class RabbitMqConfig {
    /*队列*/
    public static final String TEST_QUEUE = "simple-amqp_queue";

    /**声明队列
     * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
     *         this(name, durable, exclusive, autoDelete, (Map)null);
     *     }
     * String name: 队列名
     * boolean durable: 持久化消息队列,rabbitmq 重启的时候不需要创建新的队列,默认为 true
     * boolean exclusive: 表示该消息队列是否只在当前的connection生效,默认为 false
     * boolean autoDelete: 表示消息队列在没有使用时将自动被删除,默认为 false*/
   @Bean(TEST_QUEUE)
    public Queue testQueue() {
        return new Queue(TEST_QUEUE, true);
    }

复制代码

发送消息类

package com.ao.springbootamqp.service;

import com.ao.springbootamqp.config.RabbitMqConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.UUID;

@Component
@Slf4j
public class RabbitMqService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /*发送消息到队列*/
    public String sendQueue(Object payload){
        return baseSend("", RabbitMqConfig.TEST_QUEUE, payload, null, null);
    }

    /**
     * MQ 公用发送方法
     *
     * @param exchange  交换机
     * @param routingKey  队列
     * @param payload 消息体
     * @param messageId  消息id(唯一性)
     * @param messageExpirationTime  持久化时间
     * @return 消息编号
     */
    public String baseSend(String exchange, String routingKey, Object payload, String messageId, Long messageExpirationTime) {
        /*若为空,则自动生成*/
        if (messageId == null) {
            messageId = UUID.randomUUID().toString();
        }
        String finalMessageId = messageId;
        /*设置消息属性*/
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                /*消息属性中写入消息id*/
                message.getMessageProperties().setMessageId(finalMessageId);
                /*设置消息持久化时间*/
                if (!StringUtils.isEmpty(messageExpirationTime)){
                    message.getMessageProperties().setExpiration(messageExpirationTime.toString());
                }
                /*设置消息持久化*/
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                return message;
            }
        };

        /*构造消息体,转换json数据格式*/
        Message message = null;
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            String json = objectMapper.writeValueAsString(payload);
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentEncoding(MessageProperties.CONTENT_TYPE_JSON);
            message = new Message(json.getBytes(), messageProperties);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        /*表示当前消息唯一性*/
        CorrelationData correlationData = new CorrelationData(finalMessageId);

        /**
         * public void convertAndSend(String exchange, String routingKey, Object message,
         * MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) throws AmqpException
         * exchange: 路由
         * routingKey: 绑定key
         * message: 消息体
         * messagePostProcessor: 消息属性处理器
         * correlationData: 表示当前消息唯一性
         */
        rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);

        return finalMessageId;
    }
}

复制代码

测试

发送消息

@SpringBootTest
class RabbitMqTest {
    @Autowired
    private RabbitMqService rabbitMqService;

    @Test
    public void tt(){
        String s = "顺丰快递";
        rabbitMqService.sendQueue(s);
    }

}
复制代码

查看管理界面

RabbitMQ之springboot版详解

可以看到,消息已经成功发送到服务器上啦,里面消息的属性也正是我们设置好的。因为消息已经发送到服务器上啦,所以待会启动消费者便可以消费了

消费者

@Component
public class RecService {
  /*队列*/
    public static final String TEST_QUEUE = "simple-amqp_queue";

    @RabbitListener(queues = TEST_QUEUE)
    public void t2(Message message){
        try {
            String msg = new String(message.getBody());
            if (msg == null) {
                System.out.println("消息为空");
            }
            System.out.println("我收到了=-=" + msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制代码

启动并查看

RabbitMQ之springboot版详解

work消息模型

示例图

RabbitMQ之springboot版详解

P(producer/ publisher):生产者,如寄快递

C1、C2(consumer):消费者,如收快递

红色区域:队列,如快递区,等待消费者拿快递

循环发送10条消息

 @SpringBootTest
class RabbitMqTest {
    @Autowired
    private RabbitMqService rabbitMqService;

    @Test
    public void tt(){
        for (int i = 0;i < 10; i++){
            String s = "消息" + i;
            rabbitMqService.sendQueue(s);
        }
    }
}
复制代码

消费者1/2

@Component
public class RecService1 {
    /*队列*/
    public static final String TEST_QUEUE = "work-amqp-queue";

    @RabbitListener(queues = TEST_QUEUE)
    public void t2(Message message){
        try {
            String msg = new String(message.getBody());
            if (msg == null) {
                System.out.println("消息为空");
            }
            System.out.println("消费者1收到=-=" + msg);
        
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制代码

查看控制台

RabbitMQ之springboot版详解
RabbitMQ之springboot版详解

可以看到,消费一样多,如果想能者多劳模式,添加配置如下:

    #指定一个请求能够处理多少个消息
    listener:
      simple:
#测试消费者1值为3,消费者2值为1
        prefetch: 1
复制代码

或者在消费者添加channel.basicQos(1)即可。这就告诉RabbitMq不要一直向消费者发送消息,而是要等待消费者的确认了前一个消息

@Component
public class RecService1 {
    /*队列*/
    public static final String TEST_QUEUE = "work-amqp-queue";

    @RabbitListener(queues = TEST_QUEUE)
    public void t2(Message message,Channel channel){
        try {
            String msg = new String(message.getBody());
            if (msg == null) {
                System.out.println("消息为空");
            }
            System.out.println("消费者1收到=-=" + msg);
         channel.basicQos(1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制代码

重新启动两个消费者,再循环发送10条消息,查看控制台如下:

RabbitMQ之springboot版详解
RabbitMQ之springboot版详解

可以看到消费者1消费的多

订阅模型-Fanout(广播模式)

在这种订阅模式中,生产者发布消息,所有消费者都可以获取所有消息。

示例图

RabbitMQ之springboot版详解

P:生产者,如寄快递

X: 交换机,相当于快递公司

红色区域:队列,如快递区,等待消费者拿快递

C1、C2:消费者,如收快递

在RabbitMqConfig修改如下配置,声明队列1和队列2,并把交换机与这两个队列进行绑定

  /*交换机*/
    public static final String TEST_EXCHANGE = "fanout_amqp_exchange";

    /*声明一个fanout交换机*/
    @Bean(TEST_EXCHANGE)
    public Exchange testExchange() {
        // durable(true)持久化,mq重启之后,交换机还在
        return ExchangeBuilder.fanoutExchange(TEST_EXCHANGE).durable(true).build();
    }


    /*队列1*/
    public static final String TEST_QUEUE_1 = "fanout_amqp_queue_1";
    /*队列2*/
    public static final String TEST_QUEUE_2 = "fanout_amqp_queue_2";

    /**声明队列1
     * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
     *         this(name, durable, exclusive, autoDelete, (Map)null);
     *     }
     * String name: 队列名
     * boolean durable: 持久化消息队列,rabbitmq 重启的时候不需要创建新的队列,默认为 true
     * boolean exclusive: 表示该消息队列是否只在当前的connection生效,默认为 false
     * boolean autoDelete: 表示消息队列在没有使用时将自动被删除,默认为 false*/
    @Bean(TEST_QUEUE_1)
    public Queue testQueue1() {
        return new Queue(TEST_QUEUE_1, true);
    }
  /*声明队列2*/
    @Bean(TEST_QUEUE_2)
    public Queue testQueue2() {
        return new Queue(TEST_QUEUE_2, true);
    }


    /*队列1与路由进行绑定*/
    @Bean
    Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,
                          @Qualifier(TEST_EXCHANGE) Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("")
                .noargs();
    }

    /*队列2与路由进行绑定*/
    @Bean
    Binding bindingTest2(@Qualifier(TEST_QUEUE_2) Queue queue,
                        @Qualifier(TEST_EXCHANGE) Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("")
                .noargs();
    }
复制代码

RabbitMqService 添加发送方式: 发送到交换机

  /*发送到交换器*/
    public String sendExchange(Object payload,String routingKey){
        return baseSend(RabbitMqConfig.TEST_EXCHANGE, routingKey, payload, null, null);
    }
复制代码

发送消息

 @Test
    public void t1(){
            String s = "广播快递";
            rabbitMqService.sendExchange(s,"");
    }
复制代码

查看交换机绑定关系

RabbitMQ之springboot版详解

查看消息发送是否成功

可以看到已经发送成功 RabbitMQ之springboot版详解

启动消费者

改一下相应的队列名再启动

 @RabbitListener(queues = TEST_QUEUE)
复制代码

RabbitMQ之springboot版详解 RabbitMQ之springboot版详解

订阅模型-Direct(路由模式)

在这种订阅模式中,生产者发布消息,消费者有选择性的接收消息。队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)。消息的发送方在向Exchange发送消息时,也必须指定消息的routing key

示例图

RabbitMQ之springboot版详解 P:生产者,如寄快递

X: 交换机,相当于快递公司

红色区域:队列,如快递区,等待消费者拿快递

C1、C2:消费者,如收快递

error、info这些就是我们讲的RoutingKey

修改RabbitMqConfig配置, 主要是在交换机与这两个队列进行绑定时候指定routingkey,队列1只接收顺丰快递,队列2只接收京东快递

  /*交换机*/
    public static final String TEST_EXCHANGE = "direct_amqp_exchange";

    /*声明一个direct交换机*/
    @Bean(TEST_EXCHANGE)
    public Exchange testExchange() {
        // durable(true)持久化,mq重启之后,交换机还在
        return ExchangeBuilder.directExchange(TEST_EXCHANGE).durable(true).build();
    }

    /*队列1*/
    public static final String TEST_QUEUE_1 = "direct_amqp_queue_1";
    /*队列2*/
    public static final String TEST_QUEUE_2 = "direct_amqp_queue_2";

    /**声明队列
     * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
     *         this(name, durable, exclusive, autoDelete, (Map)null);
     *     }
     * String name: 队列名
     * boolean durable: 持久化消息队列,rabbitmq 重启的时候不需要创建新的队列,默认为 true
     * boolean exclusive: 表示该消息队列是否只在当前的connection生效,默认为 false
     * boolean autoDelete: 表示消息队列在没有使用时将自动被删除,默认为 false*/
    @Bean(TEST_QUEUE_1)
    public Queue testQueue1() {
        return new Queue(TEST_QUEUE_1, true);
    }

    @Bean(TEST_QUEUE_2)
    public Queue testQueue2() {
        return new Queue(TEST_QUEUE_2, true);
    }

    /*队列1路由进行绑定*/
    @Bean
    Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,
                          @Qualifier(TEST_EXCHANGE) Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("SF")
                .noargs();
    }

    /*队列2路由进行绑定*/
    @Bean
    Binding bindingTest2(@Qualifier(TEST_QUEUE_2) Queue queue,
                        @Qualifier(TEST_EXCHANGE) Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("JD")
                .noargs();
    }
复制代码

发送消息

@Test
public void t2(){
        String s = "京东快递";
        String s1 = "顺丰快递";
        rabbitMqService.sendExchange(s,"JD");
        rabbitMqService.sendExchange(s1,"SF");
}
复制代码

查看交换机绑定关系

RabbitMQ之springboot版详解

查看消息是否发送成功

RabbitMQ之springboot版详解

启动消费者

改一下相应的队列名再启动,按道理来说消费者1应该收到顺丰快递,消费者2应该收到京东快递,结果如下: RabbitMQ之springboot版详解 RabbitMQ之springboot版详解

结果符合预期。

订阅模型-Topic(通配符模式)

示例图

RabbitMQ之springboot版详解

Topic 类型的 ExchangeDirect 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用 通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割

通配符规则:

# :匹配一个或多个词

* :匹配不多不少恰好1个词

修改RabbitMqConfig与direct基本一样,只修改了一下队列名和交换机,routingkey改成 队列1只接收顺丰快递,队列2任何快递都接收

  
  /*声明一个direct交换机*/
    @Bean(TEST_EXCHANGE)
    public Exchange testExchange() {
        // durable(true)持久化,mq重启之后,交换机还在
        return ExchangeBuilder.topicExchange(TEST_EXCHANGE).durable(true).build();
    }

 /*队列1路由进行绑定*/
    @Bean
    Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,
                          @Qualifier(TEST_EXCHANGE) Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("SF.kd")
                .noargs();
    }

    /*队列2路由进行绑定*/
    @Bean
    Binding bindingTest2(@Qualifier(TEST_QUEUE_2) Queue queue,
                        @Qualifier(TEST_EXCHANGE) Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("#.kd")
                .noargs();
    }
复制代码

发送消息

  @Test
    public void t2(){
            String s = "EMS快递";
            String s1 = "顺丰快递";
            String s2 = "京东快递";
            rabbitMqService.sendExchange(s,"EMS.kd");
            rabbitMqService.sendExchange(s1,"SF.kd");
            rabbitMqService.sendExchange(s2,"JD.kd");
    }
复制代码

查看交换机绑定关系

RabbitMQ之springboot版详解

查看消息是否发送成功

RabbitMQ之springboot版详解

启动消费者

结果如下,符合预期!

RabbitMQ之springboot版详解
RabbitMQ之springboot版详解

进阶

温馨提示:以下代码示例都以路由模式进行演示。

消息可靠性投递

实现RabbitMQ消息的可靠要保证以下3点:

  • RabbitMQ消息确认机制:RabbitMQ消息确认有2种:消息发送确认,消费接收确认。消息发送确认是确认生产者将消息发送到Exchange,Exchange分发消息至Queue的过程中,消息是否可靠投递。第一步是否到达Exchange,第二步确认是否到达Queue。

  • 交换机,队列,消息进行持久化:防止消息发送到了broker,还没等到消费者消费 ,broker就挂掉了

  • 消费者确认机制: 模式有3种: none(没有任何的应答会被发送) , auto(自动应答) , manual(手动应答) 。为了保证消息可靠性,我们设置手动应答,这是为什么呢?采用自动应答的方式,每次消费端收到消息后,不管是否处理完成,Broker都会把这条消息置为完成,然后从Queue中删除。如果消费端消费时,抛出异常,消费端没有成功消费该消息,从而造成消息丢失。手动应答方式可以调用basicAck、basicNack、basicReject方法,只有在消息得到正确处理下,再发送ACK。

RabbitMQ消息确认机制

修改配置

spring:
  rabbitmq:
    host: 127.0.0.1
    username: admin123
    password: 123456
    virtual-host: /test
    # 确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
    publisher-confirms: true
    # 实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发
    publisher-returns: true
    listener:
    # 消息消费确认,可以手动确认
      simple:
        acknowledge-mode: manual
复制代码

修改RabbitMqService

增加实现ConfirmCallBack接口和实现ReturnCallback接口代码

 // 消息发送到交换器Exchange后触发回调
    private final RabbitTemplate.ConfirmCallback confirmCallback =
            new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    if (ack) {
                        //成功业务逻辑
                        log.info("消息投递到及交换机成功啦!!!");
                    } else {
                        //失败业务逻辑
                        log.info("消息投递到及交换机失败啦!!");
                    }
                }
            };

    // 如果消息从交换器发送到对应队列失败时触发
    private final RabbitTemplate.ReturnCallback returnCallback =
            new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    //失败业务逻辑
                    log.info("message=" + message.toString());
                    log.info("replyCode=" + replyCode);
                    log.info("replyText=" + replyText);
                    log.info("exchange=" + exchange);
                    log.info("routingKey=" + routingKey);
                }
            };
复制代码

在rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData)之前增加如下代码:

    rabbitTemplate.setConfirmCallback(this.confirmCallback);
        rabbitTemplate.setReturnCallback(this.returnCallback);
复制代码

测试消息到交换机是否成功

为了方便测试,用controller发送消息。消息路由不到合适的Exchange,Confirm机制回送的ACK会返回false,走异常处理,进行一些业务逻辑,如重试或者补偿等手段

@RestController
public class TestController {

    @Autowired
    private RabbitMqService sender;

    @PostMapping("/tt")
    public String sendMsg(String msg){
        sender.sendExchange(msg,"");
        return "ok";
    }
}
复制代码

查看控制台

RabbitMQ之springboot版详解

测试消息到队列是否成功

这里在发送消息的时候,指定一个不存在的routingkey,模拟失败回调

 sender.sendExchange(msg,"XXX");
复制代码

查看控制台

RabbitMQ之springboot版详解

交换机,队列,消息进行持久化

这个在上文的代码中有提到,略。

消费者确认机制

前面提到有这3种手动应答方式basicAck、basicNack、basicReject,那么先了解一下。

basicAck

当multiple为false,只确认当前的消息。当multiple为true,批量确认所有比当前deliveryTag小的消息。deliveryTag是用来标识Channel中投递的消息。RabbitMQ保证在每个Channel中,消息的deliveryTag是从1递增。

   public void basicAck(long deliveryTag, boolean multiple) throws IOException {
        this.transmit(new Ack(deliveryTag, multiple));
        this.metricsCollector.basicAck(this, deliveryTag, multiple);
    }
复制代码

basicNack

当消费者消费消息时出现异常了,那么可以使用这种方式。当requeue为true,失败消息会重新进入Queue,一般结合重试机制使用,当重试次数超过最大值,丢弃该消息)或者是死信队列+重试队列。当requeue为false,丢弃该消息。

public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
        this.transmit(new Nack(deliveryTag, multiple, requeue));
        this.metricsCollector.basicNack(this, deliveryTag);
    }
复制代码

basicReject

和basicNack用法一样。

测试

先把手动确定注释掉

 @RabbitListener(queues = TEST_QUEUE)
    public void t2(Message message, Channel channel) throws IOException {
            String msg = new String(message.getBody());
            System.out.println("消费者1收到=-=" + msg);
//            long deliveryTag = message.getMessageProperties().getDeliveryTag();
//            channel.basicAck(deliveryTag,false);
    }
复制代码

查看管理界面

消息变成unacked RabbitMQ之springboot版详解

停止消费者程序,消息又变成ready,这是因为虽然我们设置了手动ACK,但是代码中并没有进行消息确认!所以消息并未被真正消费掉。当我们关掉这个消费者,消息的状态再次称为Ready

RabbitMQ之springboot版详解

消费者重试机制

配置

加入如下配置,消费者重试是在listener下配置retry相关参数,生产者重试是在template下配置retry相关参数,别搞混了

    listener:
    # 消息消费确认,可以手动确认
      simple:
        acknowledge-mode: manual
        #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
        retry:
          enabled: true
           #初始重试间隔为1s
          initial-interval: 1000
          #重试的最大次数
          max-attempts: 3
          #重试间隔最多1s
          max-interval: 1000
          #每次重试的因子是1.0 等差
          multiplier: 1.0
复制代码

测试

模拟消费者消费出异常啦,加入int i=1/0;

查看控制台

可以看到,重试了3次消费 RabbitMQ之springboot版详解

重试次数用完怎么办

如果listener.retry次数尝试完并还是抛出异常,那该怎么办?可以通过配置MessageRecoverer对异常消息进行处理,默认有两个实现:

  • RepublishMessageRecoverer:将消息重新发送到指定队列,需手动配置。测试一下:

在RabbitMqConfig增加如下:先声明一个重试的交换机(RETRY_EXCHANGE)和一个声明重试队列(RETRY_QUEUE),然后进行绑定,routingkey为:retry

 @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, "retry");
    }
复制代码

增加一个消费者,如下:

  @RabbitListener(queues = RETRY_QUEUE)
    public void t3(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("重试消费者收到了=-=" + msg);
      long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag,false);
    }
复制代码

重试次数用完了(因重试的最大次数配置为3),测试结果如下:

RabbitMQ之springboot版详解
  • RejectAndDontRequeueRecoverer:如果不手动配置MessageRecoverer,会默认使用这个,实现仅仅是将异常打印抛出,源码如下(测试略):

public class RejectAndDontRequeueRecoverer implements MessageRecoverer {

    protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class);

    @Override
    public void recover(Message message, Throwable cause) {
     if (this.logger.isWarnEnabled()) {
            this.logger.warn("Retries exhausted for message " + message, cause);
     }
     throw new ListenerExecutionFailedException("Retry Policy Exhausted", new AmqpRejectAndDontRequeueException(cause), message);
    }
}
复制代码

消息重复消费

由重试机制可能会造成延迟,从而造成重复消费的问题,比如说支付,推送短信,邮件等。

解决办法-全局唯一ID

  • 消息发送方在发送时在消息头加入唯一ID,比如UUID,订单号,时间戳,traceId等。在上文对消息的封装已经处理过: RabbitMQ之springboot版详解

  • 接收方接受消息后先获取消息头的唯一ID,判断redis内是否已经包含唯一ID,如果包含说明已经消费成功,直接不处理消息。如果redis内不包含唯一ID,处理消息,成功后把唯一ID存入缓存

死信队列

死信队列是什么

死信,顾名思义就是无法被消费的消息,如消费者出现某种异常导致消息没有被消费,就会将消息重新投递到另一个Exchange(Dead Letter Exchanges),该Exchange再根据routingKey重定向到另一个队列,在这个队列重新处理该消息。

死信队列常见来源

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.

  • 消息TTL过期

  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

死信的处理方式

  • 消息被拒绝 (basic.reject or basic.nack) 且带 requeue=false不重新入队参数或达到了retry重新入队的上限次数

  • 消息的TTL(Time To Live)-存活时间已经过期

  • 队列长度限制被超越(队列满,queue的" x-max-length "参数)

本例使用第三种。

代码示例

初始化死信队列、交换机及绑定

声明一个死信交换机(DL_EXCHANGE)和死信队列(DL_QUEUE),然后进行绑定,并且声明业务队列(TEST_QUEUE_1)时加入 x-dead-letter-exchangex-dead-letter-routing-key 的参数,代码如下:

   /*业务交换机*/
    public static final String TEST_EXCHANGE = "test_amqp_exchange";

 /*声明业务交换机*/
    @Bean(TEST_EXCHANGE)
    public Exchange testExchange() {
        // durable(true)持久化,mq重启之后,交换机还在
        return ExchangeBuilder.directExchange(TEST_EXCHANGE).durable(true).build();
    }

 /*队列1*/
    public static final String TEST_QUEUE_1 = "test_amqp_queue_1";

    @Bean(TEST_QUEUE_1)
    public Queue testQueue1() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    声明  死信交换机
        args.put("x-dead-letter-exchange", DL_EXCHANGE);
//       x-dead-letter-routing-key    声明 死信路由键
        args.put("x-dead-letter-routing-key", "dlk");
        return QueueBuilder.durable(TEST_QUEUE_1).withArguments(args).build();
    }

   /*队列1路由进行绑定*/
    @Bean
    Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,
                          @Qualifier(TEST_EXCHANGE) Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("SF")
                .noargs();
    }

    /*死信交换机*/
    public static final String DL_EXCHANGE = "deadLetterExchange";

    /*声明死信交换机*/
    @Bean(DL_EXCHANGE)
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange(DL_EXCHANGE).durable(true).build();
    }

    /*死信队列*/
    public static final String DL_QUEUE = "deadLetterQueue";
    /*声明死信队列*/
    @Bean(DL_QUEUE)
    public Queue deadLetterQueue() {
        return new Queue(DL_QUEUE,true);
    }

    /*死信队列绑定死信交换机*/
    @Bean
    Binding bindingDead(@Qualifier(DL_QUEUE) Queue queue,
                         @Qualifier(DL_EXCHANGE) Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("dlk")
                .noargs();
    }

复制代码

消费者

  //业务消费者
@RabbitListener(queues = TEST_QUEUE)
    public void t2(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        try {
            int i = 1/0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e){
            System.out.println("消费者1出错啦");
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }}

//死信消费者
 @RabbitListener(queues = DL_QUEUE)
    public void t3( Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("死信队列收到了=-=" + msg);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag,false);
    }
复制代码

查看控制台

RabbitMQ之springboot版详解

大概流程

大概流程就是消息被业务消费者消费,此时业务消费者挂掉了,就走catch代码basicNack,mq收到了nack就会把消息重新投递到业务队列x-dead-letter-exchange绑定的死信交换机,然后根据业务队列x-dead-letter-routing-key绑定的死信路由键匹配到死信队列,然后最终被死信消费者消费了。

延迟队列

延时队列是什么

延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。

延时队列应用场景

  • 订单在十分钟之内未支付则自动取消。

  • 用户进行退款,卖家在三天内没有进行处理,则短信通知卖家或通知所驻的平台。

实现延时队列方式

方式一:TTL(Time To Live)+DLX(Dead Letter Exchanges)

死信(DLX)上文已经了解过了,那么什么是TTL呢?RabbitMQ可以针对Queue设置x-message-ttl 或者 针对Message设置setExpiration ,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)。

设置TTL有两种方式

  • 通过队列属性设置,队列中所有消息都有相同的过期时间。

    缺点:如果使用这种方式设置消息的TTL,当延时时间梯度比较多的话,比如1分钟,2分钟,5分钟,12分钟……需要创建很多交换机和队列来路由消息。

   @Bean(TEST_QUEUE_1)
    public Queue testQueue1() {
        Map<String, Object> args = new HashMap<>(2);
        //声明过期时间5秒
        args.put("x-message-ttl", 5000);
        // x-dead-letter-exchange    声明  死信交换机
        args.put("x-dead-letter-exchange", DL_EXCHANGE);
  //x-dead-letter-routing-key    声明 死信路由键
        args.put("x-dead-letter-routing-key", "dlk");
        return QueueBuilder.durable(TEST_QUEUE_1).withArguments(args).build();
    }
复制代码
  • 对消息进行单独设置,每条消息TTL可以不同。

    缺点:如果单独设置消息的TTL,则可能会造成队列中的消息阻塞,因为队列是先进先出的,前一条消息没有出队(没有被消费),后面的消息无法投递。消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,索引如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。

RabbitMQ之springboot版详解

方式二:rabbitmq-delayed-message-exchange插件

官网下载

可解决单独对消息设置TTL,延时时长短的优先处理

代码示例

下面演示的是死信+TTL,代码还是以上文死信队列的为主

队列设置过期时间

在业务队列增加x-message-ttl配置,设置一秒;消费者删除业务消费者(模拟消息没被消费而过期),只留下死信消费者;其余不变。

  @Bean(TEST_QUEUE_1)
    public Queue testQueue1() {
        Map<String, Object> args = new HashMap<>(2);
        //声明过期时间5秒
        args.put("x-message-ttl", 1000);
        // x-dead-letter-exchange    声明  死信交换机
        args.put("x-dead-letter-exchange", DL_EXCHANGE);
  //x-dead-letter-routing-key    声明 死信路由键
        args.put("x-dead-letter-routing-key", "dlk");
        return QueueBuilder.durable(TEST_QUEUE_1).withArguments(args).build();
    }
复制代码

查看控制台

可以看到,时间 在1秒后被死信消费者消费

RabbitMQ之springboot版详解

消息设置过期时间

注释掉队列的过期时间,然后修改一下发送方法,如下;

/*发送到交换器*/
    public String sendExchange(Object payload,String routingKey,Long messageExpirationTime){
        return baseSend(RabbitMqConfig.TEST_EXCHANGE, routingKey, payload, null, messageExpirationTime);
    }

复制代码

controller如下,消费者不变

  @Autowired
    private RabbitMqService sender;

    @PostMapping("/tt")
    public String sendMsg(String msg){
        sender.sendExchange(msg,"SF",5000L);
        System.out.println("【5秒过期时间测试】发送时间是:"+LocalDateTime.now());
        return "ok";
    }
复制代码

查看控制台

可以看到消息5秒后被死信消费者消费

RabbitMQ之springboot版详解
原文  https://juejin.im/post/5f12b98a6fb9a07ec07b5104
正文到此结束
Loading...