下载ActiveMq的tar安装包, 解压到响应目录下, 使用bin目录下的./activemq start启动, ./activemq stop停止
配置类
@EnableJms
@Configuration
public class ActiveMQ4Config {
@Bean
public Queue queue(){
return new ActiveMQQueue("queue1");
}
@Bean
public RedeliveryPolicy redeliveryPolicy(){
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
//是否在每次尝试重新发送失败后,增长这个等待时间
redeliveryPolicy.setUseExponentialBackOff(true);
//重发次数,默认为6次 这里设置为10次
redeliveryPolicy.setMaximumRedeliveries(10);
//重发时间间隔,默认为1秒
redeliveryPolicy.setInitialRedeliveryDelay(1);
//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 /* 2毫秒,这里的2就是 value
redeliveryPolicy.setBackOffMultiplier(2);
//是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(false);
//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
return redeliveryPolicy;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${activemq.url}")String url,RedeliveryPolicy redeliveryPolicy){
ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(
"admin",
"admin",
url);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
@Bean(name="jmsQueueTemplate")
public JmsTemplate jmsQueueTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) {
//设置创建连接的工厂
//JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory);
//优化连接工厂,这里应用缓存池 连接工厂就即可
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
//设置默认消费topic
//jmsTemplate.setDefaultDestination(topic());
//设置P2P队列消息类型
jmsTemplate.setPubSubDomain(isPubSubDomain);
DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique();
if (destinationResolver != null) {
jmsTemplate.setDestinationResolver(destinationResolver);
}
MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique();
if (messageConverter != null) {
jmsTemplate.setMessageConverter(messageConverter);
}
//deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
jmsTemplate.setExplicitQosEnabled(true);
//DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
//定义持久化后节点挂掉以后,重启可以继续消费.
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
//默认不开启事务
System.out.println("默认是否开启事务:"+jmsTemplate.isSessionTransacted());
//如果不启用事务,则会导致XA事务失效;
//作为生产者如果需要支持事务,则需要配置SessionTransacted为true
//jmsTemplate.setSessionTransacted(true);
//消息的应答方式,需要手动确认,此时SessionTransacted必须被设置为false,且为Session.CLIENT_ACKNOWLEDGE模式
//Session.AUTO_ACKNOWLEDGE 消息自动签收
//Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
//Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return jmsTemplate;
}
@Bean(name="jmsTopicTemplate")
public JmsTemplate jmsTopicTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) {
//设置创建连接的工厂
//JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
//优化连接工厂,这里应用缓存池 连接工厂就即可
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
//设置默认消费topic
//jmsTemplate.setDefaultDestination(topic());
//设置发布订阅消息类型
jmsTemplate.setPubSubDomain(isPubSubDomain);
//deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
jmsTemplate.setExplicitQosEnabled(true);
//DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
//默认不开启事务
System.out.println("是否开启事务"+jmsTemplate.isSessionTransacted());
//如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。
//jmsTemplate.setSessionTransacted(true);
//不带事务的session的签收方式,取决于session的配置。
//默认消息确认方式为1,即AUTO_ACKNOWLEDGE
System.out.println("是否消息确认方式"+jmsTemplate.getSessionAcknowledgeMode());
//消息的应答方式,需要手动确认,此时SessionTransacted必须被设置为false,且为Session.CLIENT_ACKNOWLEDGE模式
//Session.AUTO_ACKNOWLEDGE 消息自动签收
//Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
//Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return jmsTemplate;
}
//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory);
//设置连接数
factory.setConcurrency("1-10");
//重连间隔时间
factory.setRecoveryInterval(1000L);
factory.setSessionAcknowledgeMode(4);
return factory;
}
}
消费者
@Component
public class Consumer {
private final static Logger logger = LoggerFactory.getLogger(Consumer.class);
@JmsListener(destination = "queue1", containerFactory = "jmsQueueListener")
public void receiveQueue(final TextMessage text, Session session)throws JMSException {
try {
logger.debug("Consumer收到的报文为:" + text.getText());
text.acknowledge();// 使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发
} catch (Exception e) {
session.recover();// 此不可省略 重发信息使用
}
}
}
生产者(不同的设置, 生产者和消费者要进行签收或者提交操作)
@Component
public class Producter {
@Autowired("..")//这里根据消息发布类型不同注入
private JmsTemplate jmsTemplate;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
//发送queue类型消息
public void sendQueueMsg(String msg){
jmsTemplate.convertAndSend(queue, msg);
}
//发送topic类型消息
public void sendTopicMsg(String msg){
jmsTemplate.convertAndSend(topic, msg);
}
}
延时投递的实现(其余高级特性实现方式类似)
broker配置文件schedulerSupport修改为true
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >
@Service
public class Producer {
public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue");
@Autowired
private JmsMessagingTemplate template;
/**
* 延时发送
*
* @param destination 发送的队列
* @param data 发送的消息
* @param time 延迟时间
*/
public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
// 获取连接工厂
ConnectionFactory connectionFactory = template.getConnectionFactory();
try {
// 获取连接
connection = connectionFactory.createConnection();
connection.start();
// 获取session,true开启事务,false关闭事务
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
producer = session.createProducer(destination);
producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
ObjectMessage message = session.createObjectMessage(data);
//设置延迟时间
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
// 发送消息
producer.send(message);
log.info("发送消息:{}", data);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
默认是异步发送消息, 这种消息效率更高, 但是会出现消息丢失, 但是有以下情况会发送同步消息
1.指定使用同步发送消息
2.在没有事务的前提下发送持久化消息
需要接收回调
// 创建一个消息队列
ActiveMqMessageProducer producer = (ActiveMqMessageProducer)session.createProducer(destination);
ObjectMessage message = session.createObjectMessage(data);
// 发送消息
producer.send(message, new AsyncCallback() {
...
});
1. 什么情况下会导致消息的重试
. 客户端在使用事务的前提下, rollBack()或者没有commit()消息;
. 未使用事务的前提下, 使用ACKNOWLEDGE模式, 进行了session.recover()
2. 重试多少次, 每次间隔
默认是6次, 间隔为1s
3. 超过重发的次数, 消息会被放入死信队列中
可以通过individualDeadLetterStrategy来设置各自的死信队列, 也可以设置过期
可以根据messageId来做校验, 可以使用redis来做