@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory ){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
@Test
public void test1(){
DirectExchange direct_1 = new DirectExchange("direct_1", true, false);
rabbitAdmin.declareExchange(direct_1);
Queue direct_q_1 = new Queue("direct_q_1", true);
rabbitAdmin.declareQueue(direct_q_1);
Binding binding = BindingBuilder.bind(direct_q_1).to(direct_1).with("direct");
rabbitAdmin.declareBinding(binding);
}
void convertAndSend(Object message)throws AmqpException;
void convertAndSend(String routingKey, Object message)throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message)
throws AmqpException;
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
ObjectreceiveAndConvert()throws AmqpException;
ObjectreceiveAndConvert(String queueName)throws AmqpException;
实现自己的消息转换器后调用rabbitTemplate的API( public void setMessageConverter(MessageConverter messageConverter)
)设置即可。
在与SpringBoot整合时,可以注入自己的消息转换器,amqp提供了Jackson2JsonMessageConverter,使用JackSon将消息内容转换为json字符串,配置如下:
/**
* 注入JackSon的MessageConverter,用于收发消息的格式化成json数据
* @param ObjectMapper 这个是jackson自动注入的,详情请看JacksonAutoConfiguration
*/
@Bean
public Jackson2JsonMessageConverter messageConverter(ObjectMapper ){
return new Jackson2JsonMessageConverter(objectMapper);
}
/**
* 重新注入RabbitTemplate,并且设置相关属性
*/
@Bean
@Primary
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter, CustomConfirmCallBack confirmCallBack, CustomReturnCallBack returnCallBack){
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
//设置消息转换器
template.setMessageConverter(messageConverter);
template.setReturnCallback(returnCallBack);
template.setConfirmCallback(confirmCallBack);
return template;
}
public void setExpiration(String expiration)
:单位毫秒 x-message-ttl
),设置在 arguments
。 消息确认机制,生产者发送消息可能因为网络、交换机不存在等其他问题导致消息投递失败,消息ack机制可以在消息投递之后针对失败或者成功做一些业务的处理。
只要消息发送到exchange,ConfirmCallback回调的ack=true,而returncallback是能否发送到队列的回调函数
监听步骤:
ConfirmType.CORRELATED
,代码如下: //设置消息发送ack,默认none connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
RabbitTemplate.ConfirmCallback
的实现类,重写其中的方法,如下: /**
* @Description 消息确认回调,在消息发出之后回调
* @Author CJB
* @Date 2020/2/21 15:36
*/
@Component
public class MyConfirmCallbackimplements RabbitTemplate.ConfirmCallback{
/**
*
* @param correlationData 发送消息时携带的参数,在业务上能够唯一识别,比如主键id等
* @param ack 消息是否发送成功的标志,true成功,false失败
* @param cause 消息发送失败的原因
*/
@Override
public void confirm(CorrelationData correlationData,boolean ack, String cause){
System.err.println(correlationData.getId()+"---->"+ack+"--->"+cause);
//消息投递失败执行逻辑,比如消息入库,设置失败标记等操作
if (!ack){
System.err.println("消息投递失败");
}
}
}
template.setConfirmCallback(myConfirmCallback);
用于处理一些路由不可达的消息,比如发送消息时指定的路由投递不到相应的队列,此时Return Listener就会监听到这些消息进行处理
实现步骤:
ConnectionFactory
的 publisherReturns
为true //设置开启发布消息的Return监听
connectionFactory.setPublisherReturns(true);
mandatory
为true,或者 mandatory-expression
执行的结果为true template.setMandatory(true);
RabbitTemplate.ReturnCallback
的类,重写其中的方法,如下: /**
* @Description ReturnListener的监听,处理发送消息时路由不可达的消息
* @Author CJB
* @Date 2020/2/21 17:04
*/
@Component
public class MyReturnCallBackimplements RabbitTemplate.ReturnCallback{
/**
* 在消息路由不可达会回调此方法,用于处理这些消息,比如记录日志,消息补偿等等操作
* @param message 投递的消息
* @param replyCode 响应的状态吗
* @param replyText 响应的文本
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey){
System.err.println("message:"+new String(message.getBody()));
System.err.println("replyCode:"+replyCode);
System.err.println("replyText:"+replyText);
System.err.println("exchange:"+exchange);
System.err.println("routingKey:"+routingKey);
}
}
template.setReturnCallback(myReturnCallBack);
异步监听消息需要设置一个监听器,一旦监听的队列中有消息发送,此监听器将会起作用。
步骤如下:
SimpleMessageListenerContainer
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//添加监听的队列
container.addQueueNames("queue1");
//设置消费者ack消息的模式,默认是自动,此处设置为手动
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置消费者的并发数量
container.setConcurrentConsumers(1);
//设置单个消费请求能够处理的消息条数,默认250
container.setPrefetchCount(250);
//设置最大的并发数量
container.setMaxConcurrentConsumers(10);
//设置消费者的tag的生成策略,队列的名字+"_"+UUID
container.setConsumerTagStrategy(queue -> queue+"_"+ UUID.randomUUID().toString());
//设置消息监听器
container.setMessageListener(customMessageListener1());
return container;
}
自定义一个消息监听器MessageListener的实现类,此处有两个接口:
MessageListener ChannelAwareMessageListener
/**
* 自定义Message监听器
* @return
*/
@Bean
public MessageListener customMessageListener(){
return msg-> System.err.println("消费者:"+new String(msg.getBody()));
}
@Bean
public ChannelAwareMessageListener customMessageListener1(){
return (msg,chanel)->{
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
try{
System.err.println("message:"+new String(msg.getBody()));
System.err.println("properties:"+ deliveryTag);
//.....执行系列的逻辑
//逻辑顺利执行完成之后执行ack
chanel.basicAck(deliveryTag,false);
}catch (Exception ex){
//记录日志等操作
//消息执行出现异常,nack,设置不重回队列,如果设置了死信队列,那么将会到死信队列中
chanel.basicNack(deliveryTag,false,false);
}
};
}
SimpleMessageListenerContainer
中的两个属性可以完成设置,如下: concurrentConsumers maxConcurrentConsumers
prefetchCount
,该属性用来限制消费端的同时处理的请求,默认是250,使用spring AMQP直接设置即可,与SpringBoot整合,配置如下: spring: rabbitmq: listener: simple: prefetch: 1
默认是自动ack的,即是在接收到这条消息之后无论有没有正确消费,这条消息都会从队列中删除。当然可以设置手动ack,即是在消费者接收消息,正确处理完成之后,手动确认ack,那么此条消息才会从队列中删除。
API(Channel类):
void basicAck(long deliveryTag, boolean multiple)
:ack消息 deliveryTag multiple
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
:nack消息 requeue
:是否重回队列,如果设置了重回队列,那么这条消息会被重新进入队列中的最后一条消息,如果设置了false并且此队列设置了死信队列,那么将会被放入死信队列中。 实现步骤:
SimpleMessageListenerContainer
的API //设置消费者ack消息的模式,默认是自动,此处设置为手动
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
ChannelAwareMessageListener
,通过自己的业务逻辑判断何时需要ack何时需要nack @Bean
public ChannelAwareMessageListener customMessageListener1(){
return (msg,chanel)->{
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
try{
System.err.println("message:"+new String(msg.getBody()));
System.err.println("properties:"+ deliveryTag);
//.....执行系列的逻辑
//逻辑顺利执行完成之后执行ack
chanel.basicAck(deliveryTag,false);
}catch (Exception ex){
//记录日志等操作
//消息执行出现异常,nack,设置不重回队列,如果设置了死信队列,那么将会到死信队列中
chanel.basicNack(deliveryTag,false,false);
}
};
}
重回队列的机制即是消息在nack之后如果设置了重回队列,那么此条消息将会被重新放入到此队列中的最后一条,之后将会被重新投递到消费端消费。
重回队列的机制并不支持使用,如果是业务逻辑上的异常导致消息重回队列,那么重新消费也是没有多大意义。在实际的工作上可以采用补偿机制解决。
设置重回队列如下:
SimpleMessageListenerContainer
中设置默认的行为如下: //设置不重回队列,默认为true,即是消息被拒绝或者nack或者监听器抛出异常之后会重新返回队列
container.setDefaultRequeueRejected(false);
//消息执行出现异常,nack,requeue=false设置不重回队列,如果设置了死信队列,那么将会到死信队列中 chanel.basicNack(deliveryTag,false,false);
x-dead-letter-exchange
,指定死信的交换机 String exchange="a_exchange";
String queueName="a_queue";
TopicExchange topicExchange = new TopicExchange(exchange, true, true);
Map<String,Object> arguments=new HashMap<>();
//指定死信队列,dlx-exchange是死信交换机
arguments.put("x-dead-letter-exchange", "dlx-exchange");
//设置死信队列的路由键,需要根据这个路由键找到对应的队列
arguments.put("x-dead-letter-routing-key", "dlx-key");
Queue queue = new Queue(queueName, true, false, false, arguments);
Binding binding = BindingBuilder.bind(queue).to(topicExchange).with("test.#");
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(topicExchange);
rabbitAdmin.declareBinding(binding);
rabbitmq默认是没有开启事务的,提交和发送消息甚至业务逻辑中间涉及到数据库操作都不在同一个事务中。
amqp如何设置事务:
setChannelTransacted
方法为true,表示使用事务。 RabbitTransactionManager
,实现了 PlatformTransactionManager
,这个事务管理器的事务只针对rabbitmq消息的发送和获取,对数据库的事务无效 @Bean
public PlatformTransactionManager transactionManager(){
RabbitTransactionManager manager = new RabbitTransactionManager();
manager.setConnectionFactory(connectionFactory());
return manager;
}
@Transactional
public void sendMsg(String msg){
//接收消息
Message message1 = rabbitTemplate.receive("a_queue");
System.err.println(new String(message1.getBody()));
String queueName="direc_q_1";
String exchangeName="direct_1";
String routingKey="direct";
Message message = MessageBuilder.withBody(msg.getBytes()).andProperties(new MessageProperties()).build();
rabbitTemplate.send(exchangeName,routingKey,message,new CorrelationData(UUID.randomUUID().toString()));
//此处出现异常,事务将会回滚
System.out.println(1/0);
}
异步消费消息使用的是监听器,此时就需要在SimpleMessageListenerContainer中设置,如下:
//开启事务
container.setChannelTransacted(true);
//设置事务管理器
container.setTransactionManager(transactionManager());