- 队列,交换器的声明创建、删除、清空
- 创建:
@Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory ){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; }
- 简单使用:创建一个direct交换器,绑定了一个队列
@Test
public void test1(){
DirectExchange direct_1 = new DirectExchange("direct_1", true, false);
rabbitAdmin.declareExchange(direct_1);
Queue direct_q_1 = new Queue("direct_q_1", true);
rabbitAdmin.declareQueue(direct_q_1);
Binding binding = BindingBuilder.bind(direct_q_1).to(direct_1).with("direct");
rabbitAdmin.declareBinding(binding);
}
MessageConvert
- 消息转换器,在发送消息和接收消息的时候将消息内容转换成指定的格式。
- 默认的消息转换器是SimpleMessageConverter,此转换器的功能就是将发送的消息体转换成字节数组(Object,String,Serializable),rabbitTemplate中会用到消息转换器的方法如下:
void convertAndSend(Object message)throws AmqpException; void convertAndSend(String routingKey, Object message)throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException; void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException; void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException; ObjectreceiveAndConvert()throws AmqpException; ObjectreceiveAndConvert(String queueName)throws AmqpException;
-
实现自己的消息转换器后调用rabbitTemplate的API(
public void setMessageConverter(MessageConverter messageConverter)
)设置即可。 -
在与SpringBoot整合时,可以注入自己的消息转换器,amqp提供了Jackson2JsonMessageConverter,使用JackSon将消息内容转换为json字符串,配置如下:
/** * 注入JackSon的MessageConverter,用于收发消息的格式化成json数据 * @param ObjectMapper 这个是jackson自动注入的,详情请看JacksonAutoConfiguration */ @Bean public Jackson2JsonMessageConverter messageConverter(ObjectMapper ){ return new Jackson2JsonMessageConverter(objectMapper); } /** * 重新注入RabbitTemplate,并且设置相关属性 */ @Bean @Primary public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter, CustomConfirmCallBack confirmCallBack, CustomReturnCallBack returnCallBack){ RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMandatory(true); //设置消息转换器 template.setMessageConverter(messageConverter); template.setReturnCallback(returnCallBack); template.setConfirmCallback(confirmCallBack); return template; }
生产者
TTL(消息或者队列)
消息TTL
-
在发送消息的时候指定的TTL,(MessageProperties)API如下:
-
public void setExpiration(String expiration)
:单位毫秒
-
队列TTL
-
在创建队列的时候指定过期时间,在创建Queue的时候需要指定过期时间(
x-message-ttl
),设置在arguments
。
消息ack和nack
-
消息确认机制,生产者发送消息可能因为网络、交换机不存在等其他问题导致消息投递失败,消息ack机制可以在消息投递之后针对失败或者成功做一些业务的处理。
-
只要消息发送到exchange,ConfirmCallback回调的ack=true,而returncallback是能否发送到队列的回调函数
-
监听步骤:
-
设置connectionFacotry的发布确认模式为
ConfirmType.CORRELATED
,代码如下:
//设置消息发送ack,默认none connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
-
自定义
RabbitTemplate.ConfirmCallback
的实现类,重写其中的方法,如下:
/** * @Description 消息确认回调,在消息发出之后回调 * @Author CJB * @Date 2020/2/21 15:36 */ @Component public class MyConfirmCallbackimplements RabbitTemplate.ConfirmCallback{ /** * * @param correlationData 发送消息时携带的参数,在业务上能够唯一识别,比如主键id等 * @param ack 消息是否发送成功的标志,true成功,false失败 * @param cause 消息发送失败的原因 */ @Override public void confirm(CorrelationData correlationData,boolean ack, String cause){ System.err.println(correlationData.getId()+"---->"+ack+"--->"+cause); //消息投递失败执行逻辑,比如消息入库,设置失败标记等操作 if (!ack){ System.err.println("消息投递失败"); } } }
- 在RabbitTemplate中设置
template.setConfirmCallback(myConfirmCallback);
-
设置connectionFacotry的发布确认模式为
消息Return
-
用于处理一些路由不可达的消息,比如发送消息时指定的路由投递不到相应的队列,此时Return Listener就会监听到这些消息进行处理
-
实现步骤:
-
设置
ConnectionFactory
的publisherReturns
为true
//设置开启发布消息的Return监听 connectionFactory.setPublisherReturns(true);
-
设置RabbitTemplate的
mandatory
为true,或者mandatory-expression
执行的结果为true
template.setMandatory(true);
-
自定义实现
RabbitTemplate.ReturnCallback
的类,重写其中的方法,如下:
/** * @Description ReturnListener的监听,处理发送消息时路由不可达的消息 * @Author CJB * @Date 2020/2/21 17:04 */ @Component public class MyReturnCallBackimplements RabbitTemplate.ReturnCallback{ /** * 在消息路由不可达会回调此方法,用于处理这些消息,比如记录日志,消息补偿等等操作 * @param message 投递的消息 * @param replyCode 响应的状态吗 * @param replyText 响应的文本 * @param exchange 交换机 * @param routingKey 路由键 */ @Override public void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey){ System.err.println("message:"+new String(message.getBody())); System.err.println("replyCode:"+replyCode); System.err.println("replyText:"+replyText); System.err.println("exchange:"+exchange); System.err.println("routingKey:"+routingKey); } }
- 在RabbitTemplate中设置
template.setReturnCallback(myReturnCallBack);
-
设置
消费者
消息异步监听
-
异步监听消息需要设置一个监听器,一旦监听的队列中有消息发送,此监听器将会起作用。
-
步骤如下:
-
注入
SimpleMessageListenerContainer
@Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); //添加监听的队列 container.addQueueNames("queue1"); //设置消费者ack消息的模式,默认是自动,此处设置为手动 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置消费者的并发数量 container.setConcurrentConsumers(1); //设置单个消费请求能够处理的消息条数,默认250 container.setPrefetchCount(250); //设置最大的并发数量 container.setMaxConcurrentConsumers(10); //设置消费者的tag的生成策略,队列的名字+"_"+UUID container.setConsumerTagStrategy(queue -> queue+"_"+ UUID.randomUUID().toString()); //设置消息监听器 container.setMessageListener(customMessageListener1()); return container; }
-
自定义一个消息监听器MessageListener的实现类,此处有两个接口:
MessageListener ChannelAwareMessageListener
/** * 自定义Message监听器 * @return */ @Bean public MessageListener customMessageListener(){ return msg-> System.err.println("消费者:"+new String(msg.getBody())); } @Bean public ChannelAwareMessageListener customMessageListener1(){ return (msg,chanel)->{ long deliveryTag = msg.getMessageProperties().getDeliveryTag(); try{ System.err.println("message:"+new String(msg.getBody())); System.err.println("properties:"+ deliveryTag); //.....执行系列的逻辑 //逻辑顺利执行完成之后执行ack chanel.basicAck(deliveryTag,false); }catch (Exception ex){ //记录日志等操作 //消息执行出现异常,nack,设置不重回队列,如果设置了死信队列,那么将会到死信队列中 chanel.basicNack(deliveryTag,false,false); } }; }
-
注入
消费端的并发
- 默认一个队列只有一个消费者监听,但是我们可以同时设置多个消费者监听这个消息,提高消息消费的效率。
-
SimpleMessageListenerContainer
中的两个属性可以完成设置,如下:concurrentConsumers maxConcurrentConsumers
消费端限流(流量削峰)
- 假设rabbitmq服务器有上万条信息没有处理,当开启一个消费端的话,那么就有可能出现服务器卡死的情况。
- Rabbitmq提供了一种qos(服务质量保证)功能,即在非确认消息的前提下(手动确认消息),如果一定数目的消息(基于consumer或者channel的qos的设置)未被确认前(没有ack或者nack),不进行消费新的消息。
- amqp实现如下:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
消息ack
-
默认是自动ack的,即是在接收到这条消息之后无论有没有正确消费,这条消息都会从队列中删除。当然可以设置手动ack,即是在消费者接收消息,正确处理完成之后,手动确认ack,那么此条消息才会从队列中删除。
-
API(Channel类):
-
void basicAck(long deliveryTag, boolean multiple)
:ack消息deliveryTag multiple
-
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
:nack消息-
requeue
:是否重回队列,如果设置了重回队列,那么这条消息会被重新进入队列中的最后一条消息,如果设置了false并且此队列设置了死信队列,那么将会被放入死信队列中。
-
-
-
实现步骤:
-
设置消费者的确认模式为手动确认,使用的是
SimpleMessageListenerContainer
的API
//设置消费者ack消息的模式,默认是自动,此处设置为手动 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
-
消息异步监听的实现类是
ChannelAwareMessageListener
,通过自己的业务逻辑判断何时需要ack何时需要nack
@Bean public ChannelAwareMessageListener customMessageListener1(){ return (msg,chanel)->{ long deliveryTag = msg.getMessageProperties().getDeliveryTag(); try{ System.err.println("message:"+new String(msg.getBody())); System.err.println("properties:"+ deliveryTag); //.....执行系列的逻辑 //逻辑顺利执行完成之后执行ack chanel.basicAck(deliveryTag,false); }catch (Exception ex){ //记录日志等操作 //消息执行出现异常,nack,设置不重回队列,如果设置了死信队列,那么将会到死信队列中 chanel.basicNack(deliveryTag,false,false); } }; }
-
设置消费者的确认模式为手动确认,使用的是
消息重回队列
-
重回队列的机制即是消息在nack之后如果设置了重回队列,那么此条消息将会被重新放入到此队列中的最后一条,之后将会被重新投递到消费端消费。
-
重回队列的机制并不支持使用,如果是业务逻辑上的异常导致消息重回队列,那么重新消费也是没有多大意义。在实际的工作上可以采用补偿机制解决。
-
设置重回队列如下:
-
SimpleMessageListenerContainer
中设置默认的行为如下:
//设置不重回队列,默认为true,即是消息被拒绝或者nack或者监听器抛出异常之后会重新返回队列 container.setDefaultRequeueRejected(false);
- 在nack消息的时候有一个requeue的属性设置,如下:
//消息执行出现异常,nack,requeue=false设置不重回队列,如果设置了死信队列,那么将会到死信队列中 chanel.basicNack(deliveryTag,false,false);
-
死信队列
-
消息变成死信的情况如下( 前提:消息所在队列设置了死信队列
):- 消息被拒绝(nack/reject)并且requeue=false(不设置重回队列)
- 消息的TTL过期
- 队列达到最大长度
- 死信队列在rabbitmq中其实是一个exchange,只是普通的交换机和队列。
-
想要消息被拒绝或者过期之后能够回到死信队列中,需要在队列声明的时候添加一个
x-dead-letter-exchange
,指定死信的交换机
String exchange="a_exchange"; String queueName="a_queue"; TopicExchange topicExchange = new TopicExchange(exchange, true, true); Map<String,Object> arguments=new HashMap<>(); //指定死信队列,dlx-exchange是死信交换机 arguments.put("x-dead-letter-exchange", "dlx-exchange"); //设置死信队列的路由键,需要根据这个路由键找到对应的队列 arguments.put("x-dead-letter-routing-key", "dlx-key"); Queue queue = new Queue(queueName, true, false, false, arguments); Binding binding = BindingBuilder.bind(queue).to(topicExchange).with("test.#"); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareExchange(topicExchange); rabbitAdmin.declareBinding(binding);
事务【不推荐】
-
rabbitmq默认是没有开启事务的,提交和发送消息甚至业务逻辑中间涉及到数据库操作都不在同一个事务中。
-
amqp如何设置事务:
- 关闭生产的消息确认(ack),当然默认是不开启的,投递消息的确认和事务是不能同时存在的
-
设置RabbitTemplate中的
setChannelTransacted
方法为true,表示使用事务。 -
定义事务管理器
RabbitTransactionManager
,实现了PlatformTransactionManager
,这个事务管理器的事务只针对rabbitmq消息的发送和获取,对数据库的事务无效
@Bean public PlatformTransactionManager transactionManager(){ RabbitTransactionManager manager = new RabbitTransactionManager(); manager.setConnectionFactory(connectionFactory()); return manager; }
-
同步发送和消费消息的事务,使用@Transactional注解( 无需声明RabbitTransactionManager,直接使用数据源的事务即可完成数据和mq消息的事务
),如下:
@Transactional public void sendMsg(String msg){ //接收消息 Message message1 = rabbitTemplate.receive("a_queue"); System.err.println(new String(message1.getBody())); String queueName="direc_q_1"; String exchangeName="direct_1"; String routingKey="direct"; Message message = MessageBuilder.withBody(msg.getBytes()).andProperties(new MessageProperties()).build(); rabbitTemplate.send(exchangeName,routingKey,message,new CorrelationData(UUID.randomUUID().toString())); //此处出现异常,事务将会回滚 System.out.println(1/0); }
-
异步消费消息使用的是监听器,此时就需要在SimpleMessageListenerContainer中设置,如下:
//开启事务 container.setChannelTransacted(true); //设置事务管理器 container.setTransactionManager(transactionManager());
原文
https://chenjiabing666.github.io/2020/03/09/一文搞懂Spring-AMQP/
本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 一文搞懂Spring-AMQP