转载

一文搞懂Spring-AMQP

  • 队列,交换器的声明创建、删除、清空
  • 创建:
@Bean
   public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory ){
       RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
       rabbitAdmin.setAutoStartup(true);
       return rabbitAdmin;
   }
  • 简单使用:创建一个direct交换器,绑定了一个队列
@Test
   public void test1(){
       DirectExchange direct_1 = new DirectExchange("direct_1", true, false);
       rabbitAdmin.declareExchange(direct_1);
       Queue direct_q_1 = new Queue("direct_q_1", true);
       rabbitAdmin.declareQueue(direct_q_1);
       Binding binding = BindingBuilder.bind(direct_q_1).to(direct_1).with("direct");
       rabbitAdmin.declareBinding(binding);
   }

MessageConvert

  • 消息转换器,在发送消息和接收消息的时候将消息内容转换成指定的格式。
  • 默认的消息转换器是SimpleMessageConverter,此转换器的功能就是将发送的消息体转换成字节数组(Object,String,Serializable),rabbitTemplate中会用到消息转换器的方法如下:
void convertAndSend(Object message)throws AmqpException;

void convertAndSend(String routingKey, Object message)throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message)
    throws AmqpException;

void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
    throws AmqpException;

void convertAndSend(String routingKey, Object message,
    MessagePostProcessor messagePostProcessor) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message,
    MessagePostProcessor messagePostProcessor) throws AmqpException;

ObjectreceiveAndConvert()throws AmqpException;

ObjectreceiveAndConvert(String queueName)throws AmqpException;
  • 实现自己的消息转换器后调用rabbitTemplate的API( public void setMessageConverter(MessageConverter messageConverter) )设置即可。

  • 在与SpringBoot整合时,可以注入自己的消息转换器,amqp提供了Jackson2JsonMessageConverter,使用JackSon将消息内容转换为json字符串,配置如下:

/**
    * 注入JackSon的MessageConverter,用于收发消息的格式化成json数据
    * @param ObjectMapper 这个是jackson自动注入的,详情请看JacksonAutoConfiguration
    */
   @Bean
   public Jackson2JsonMessageConverter messageConverter(ObjectMapper ){
       return new Jackson2JsonMessageConverter(objectMapper);
   }

/**
* 重新注入RabbitTemplate,并且设置相关属性
*/
@Bean
   @Primary
   public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter, CustomConfirmCallBack confirmCallBack, CustomReturnCallBack returnCallBack){
       RabbitTemplate template = new RabbitTemplate(connectionFactory);
       template.setMandatory(true);
       //设置消息转换器
    	template.setMessageConverter(messageConverter);
       template.setReturnCallback(returnCallBack);
       template.setConfirmCallback(confirmCallBack);
       return template;
   }

生产者

TTL(消息或者队列)

  • TTL表示消息或者队列的生命周期,在消息发送或者队列创建的时候可以设置消息的存活时间,如果此条消息或者队列中的所有消息到达指定的时候还是没有被消费,那么消息将会被清空或者存入 死信队列 中。

消息TTL

  • 在发送消息的时候指定的TTL,(MessageProperties)API如下:
    • public void setExpiration(String expiration) :单位毫秒

队列TTL

  • 在创建队列的时候指定过期时间,在创建Queue的时候需要指定过期时间( x-message-ttl ),设置在 arguments

消息ack和nack

  • 消息确认机制,生产者发送消息可能因为网络、交换机不存在等其他问题导致消息投递失败,消息ack机制可以在消息投递之后针对失败或者成功做一些业务的处理。

  • 只要消息发送到exchange,ConfirmCallback回调的ack=true,而returncallback是能否发送到队列的回调函数

  • 监听步骤:

    • 设置connectionFacotry的发布确认模式为 ConfirmType.CORRELATED ,代码如下:
    //设置消息发送ack,默认none
    connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
    
    • 自定义 RabbitTemplate.ConfirmCallback 的实现类,重写其中的方法,如下:
    /**
     * @Description 消息确认回调,在消息发出之后回调
     * @Author CJB
     * @Date 2020/2/21 15:36
     */
    @Component
    public class MyConfirmCallbackimplements RabbitTemplate.ConfirmCallback{
    
        /**
         *
         * @param correlationData  发送消息时携带的参数,在业务上能够唯一识别,比如主键id等
         * @param ack  消息是否发送成功的标志,true成功,false失败
         * @param cause 消息发送失败的原因
         */
        @Override
        public void confirm(CorrelationData correlationData,boolean ack, String cause){
            System.err.println(correlationData.getId()+"---->"+ack+"--->"+cause);
            //消息投递失败执行逻辑,比如消息入库,设置失败标记等操作
            if (!ack){
                System.err.println("消息投递失败");
            }
        }
    }
    
    • 在RabbitTemplate中设置
    template.setConfirmCallback(myConfirmCallback);
    

消息Return

  • 用于处理一些路由不可达的消息,比如发送消息时指定的路由投递不到相应的队列,此时Return Listener就会监听到这些消息进行处理

  • 实现步骤:

    • 设置 ConnectionFactorypublisherReturns 为true
    //设置开启发布消息的Return监听
          connectionFactory.setPublisherReturns(true);
    
    • 设置RabbitTemplate的 mandatory 为true,或者 mandatory-expression 执行的结果为true
    template.setMandatory(true);
    
    • 自定义实现 RabbitTemplate.ReturnCallback 的类,重写其中的方法,如下:
    /**
     * @Description ReturnListener的监听,处理发送消息时路由不可达的消息
     * @Author CJB
     * @Date 2020/2/21 17:04
     */
    @Component
    public class MyReturnCallBackimplements RabbitTemplate.ReturnCallback{
        /**
         * 在消息路由不可达会回调此方法,用于处理这些消息,比如记录日志,消息补偿等等操作
         * @param message  投递的消息
         * @param replyCode  响应的状态吗
         * @param replyText  响应的文本
         * @param exchange  交换机
         * @param routingKey  路由键
         */
        @Override
        public void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey){
            System.err.println("message:"+new String(message.getBody()));
            System.err.println("replyCode:"+replyCode);
            System.err.println("replyText:"+replyText);
            System.err.println("exchange:"+exchange);
            System.err.println("routingKey:"+routingKey);
        }
    }
    
    • 在RabbitTemplate中设置
    template.setReturnCallback(myReturnCallBack);
    

消费者

消息异步监听

  • 异步监听消息需要设置一个监听器,一旦监听的队列中有消息发送,此监听器将会起作用。

  • 步骤如下:

    • 注入 SimpleMessageListenerContainer
    @Bean
       public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
           SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
           //添加监听的队列
           container.addQueueNames("queue1");
           //设置消费者ack消息的模式,默认是自动,此处设置为手动
           container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
           //设置消费者的并发数量
           container.setConcurrentConsumers(1);
           //设置单个消费请求能够处理的消息条数,默认250
           container.setPrefetchCount(250);
           //设置最大的并发数量
           container.setMaxConcurrentConsumers(10);
           //设置消费者的tag的生成策略,队列的名字+"_"+UUID
           container.setConsumerTagStrategy(queue -> queue+"_"+ UUID.randomUUID().toString());
           //设置消息监听器
           container.setMessageListener(customMessageListener1());
           return container;
       }
    
    • 自定义一个消息监听器MessageListener的实现类,此处有两个接口:

      MessageListener
      ChannelAwareMessageListener
      
      /**
          * 自定义Message监听器
          * @return
          */
         @Bean
         public MessageListener customMessageListener(){
             return msg-> System.err.println("消费者:"+new String(msg.getBody()));
         }
          
         @Bean
         public ChannelAwareMessageListener customMessageListener1(){
             return (msg,chanel)->{
                 long deliveryTag = msg.getMessageProperties().getDeliveryTag();
                 try{
                     System.err.println("message:"+new String(msg.getBody()));
                     System.err.println("properties:"+ deliveryTag);
                     //.....执行系列的逻辑
          
                     //逻辑顺利执行完成之后执行ack
                     chanel.basicAck(deliveryTag,false);
                 }catch (Exception ex){
                     //记录日志等操作
          
                     //消息执行出现异常,nack,设置不重回队列,如果设置了死信队列,那么将会到死信队列中
                     chanel.basicNack(deliveryTag,false,false);
                 }
             };
         }
      

消费端的并发

  • 默认一个队列只有一个消费者监听,但是我们可以同时设置多个消费者监听这个消息,提高消息消费的效率。
  • SimpleMessageListenerContainer 中的两个属性可以完成设置,如下:
    concurrentConsumers
    maxConcurrentConsumers
    

消费端限流(流量削峰)

  • 假设rabbitmq服务器有上万条信息没有处理,当开启一个消费端的话,那么就有可能出现服务器卡死的情况。
  • Rabbitmq提供了一种qos(服务质量保证)功能,即在非确认消息的前提下(手动确认消息),如果一定数目的消息(基于consumer或者channel的qos的设置)未被确认前(没有ack或者nack),不进行消费新的消息。
  • amqp实现如下:
    • SimpleMessageListener中有一个属性 prefetchCount ,该属性用来限制消费端的同时处理的请求,默认是250,使用spring AMQP直接设置即可,与SpringBoot整合,配置如下:
spring:
rabbitmq:
listener:
simple:
prefetch: 1

消息ack

  • 默认是自动ack的,即是在接收到这条消息之后无论有没有正确消费,这条消息都会从队列中删除。当然可以设置手动ack,即是在消费者接收消息,正确处理完成之后,手动确认ack,那么此条消息才会从队列中删除。

  • API(Channel类):

    • void basicAck(long deliveryTag, boolean multiple) :ack消息
      deliveryTag
      multiple
      
    • void basicNack(long deliveryTag, boolean multiple, boolean requeue) :nack消息
      • requeue :是否重回队列,如果设置了重回队列,那么这条消息会被重新进入队列中的最后一条消息,如果设置了false并且此队列设置了死信队列,那么将会被放入死信队列中。
  • 实现步骤:

    • 设置消费者的确认模式为手动确认,使用的是 SimpleMessageListenerContainer 的API
    //设置消费者ack消息的模式,默认是自动,此处设置为手动
          container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    
    • 消息异步监听的实现类是 ChannelAwareMessageListener ,通过自己的业务逻辑判断何时需要ack何时需要nack
    @Bean
       public ChannelAwareMessageListener customMessageListener1(){
           return (msg,chanel)->{
               long deliveryTag = msg.getMessageProperties().getDeliveryTag();
               try{
                   System.err.println("message:"+new String(msg.getBody()));
                   System.err.println("properties:"+ deliveryTag);
                   //.....执行系列的逻辑
      
                   //逻辑顺利执行完成之后执行ack
                   chanel.basicAck(deliveryTag,false);
               }catch (Exception ex){
                   //记录日志等操作
      
                   //消息执行出现异常,nack,设置不重回队列,如果设置了死信队列,那么将会到死信队列中
                   chanel.basicNack(deliveryTag,false,false);
               }
           };
       }
    

消息重回队列

  • 重回队列的机制即是消息在nack之后如果设置了重回队列,那么此条消息将会被重新放入到此队列中的最后一条,之后将会被重新投递到消费端消费。

  • 重回队列的机制并不支持使用,如果是业务逻辑上的异常导致消息重回队列,那么重新消费也是没有多大意义。在实际的工作上可以采用补偿机制解决。

  • 设置重回队列如下:

    • SimpleMessageListenerContainer 中设置默认的行为如下:
    //设置不重回队列,默认为true,即是消息被拒绝或者nack或者监听器抛出异常之后会重新返回队列
           container.setDefaultRequeueRejected(false);
    
    • 在nack消息的时候有一个requeue的属性设置,如下:
    //消息执行出现异常,nack,requeue=false设置不重回队列,如果设置了死信队列,那么将会到死信队列中
    chanel.basicNack(deliveryTag,false,false);
    

死信队列

  • 消息变成死信的情况如下( 前提:消息所在队列设置了死信队列 ):
    • 消息被拒绝(nack/reject)并且requeue=false(不设置重回队列)
    • 消息的TTL过期
    • 队列达到最大长度
  • 死信队列在rabbitmq中其实是一个exchange,只是普通的交换机和队列。
  • 想要消息被拒绝或者过期之后能够回到死信队列中,需要在队列声明的时候添加一个 x-dead-letter-exchange ,指定死信的交换机
   String exchange="a_exchange";
      String queueName="a_queue";
      TopicExchange topicExchange = new TopicExchange(exchange, true, true);
      Map<String,Object> arguments=new HashMap<>();
      //指定死信队列,dlx-exchange是死信交换机
      arguments.put("x-dead-letter-exchange", "dlx-exchange");
//设置死信队列的路由键,需要根据这个路由键找到对应的队列
arguments.put("x-dead-letter-routing-key", "dlx-key");
      Queue queue = new Queue(queueName, true, false, false, arguments);
      Binding binding = BindingBuilder.bind(queue).to(topicExchange).with("test.#");
      rabbitAdmin.declareQueue(queue);
      rabbitAdmin.declareExchange(topicExchange);
      rabbitAdmin.declareBinding(binding);

事务【不推荐】

  • rabbitmq默认是没有开启事务的,提交和发送消息甚至业务逻辑中间涉及到数据库操作都不在同一个事务中。

  • amqp如何设置事务:

    • 关闭生产的消息确认(ack),当然默认是不开启的,投递消息的确认和事务是不能同时存在的
    • 设置RabbitTemplate中的 setChannelTransacted 方法为true,表示使用事务。
    • 定义事务管理器 RabbitTransactionManager ,实现了 PlatformTransactionManager ,这个事务管理器的事务只针对rabbitmq消息的发送和获取,对数据库的事务无效
    @Bean
       public PlatformTransactionManager transactionManager(){
           RabbitTransactionManager manager = new RabbitTransactionManager();
           manager.setConnectionFactory(connectionFactory());
           return manager;
       }
    
    • 同步发送和消费消息的事务,使用@Transactional注解( 无需声明RabbitTransactionManager,直接使用数据源的事务即可完成数据和mq消息的事务 ),如下:
    @Transactional
       public void sendMsg(String msg){
           //接收消息
           Message message1 = rabbitTemplate.receive("a_queue");
           System.err.println(new String(message1.getBody()));
      
           String queueName="direc_q_1";
           String exchangeName="direct_1";
           String  routingKey="direct";
           Message message = MessageBuilder.withBody(msg.getBytes()).andProperties(new MessageProperties()).build();
           rabbitTemplate.send(exchangeName,routingKey,message,new CorrelationData(UUID.randomUUID().toString()));
           //此处出现异常,事务将会回滚
           System.out.println(1/0);
       }
    
  • 异步消费消息使用的是监听器,此时就需要在SimpleMessageListenerContainer中设置,如下:

    //开启事务
          container.setChannelTransacted(true);
          //设置事务管理器
          container.setTransactionManager(transactionManager());
    
原文  https://chenjiabing666.github.io/2020/03/09/一文搞懂Spring-AMQP/
正文到此结束
Loading...