关于spring整合rabbitmq看了网上很多资料感觉描述的不够详细,正好最近自己使用到了这项技术,总结一下的详细过程,分享给大家。
1、首先有一个springMVC的demo,这里就不再介绍,自己提前准备。注意的事情为spring版本不能过低,否则会报错,我就陷入这个坑中了。spring采用4.2.3.RELEASE版本.
2、安装rabbitmq服务,以前博客中详细教程: http://blog.csdn.net/l18637220680/article/details/75258280 。
3、准备jar包
<!--rabbitmq依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.1.RELEASE</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.0</version>
</dependency>
创建rabbitmq-content.xml和rabbitmq.properties配置文件。rabbitmq.properties文件如下:
rabbit.hosts=192.168.1.239 rabbit.username=admin rabbit.password=admin123 rabbit.port=5672
在spring的配置文件中引入rabbitmq-content.xml和rabbitmq.properties配置文件。
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:/rabbitmq.properties</value>
</list>
</property>
</bean>
<import resource="classpath:/rabbitmq-context.xml" />
rabbitmq-content.xml如下。注意xml的命名空间的配置,不要有缺漏。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.0.xsd">
<!-- 定义连接工厂,用于创建连接等 -->
<rabbit:connection-factory id="connectionFactory"
username="${rabbit.username}" password="${rabbit.password}" host="${rabbit.hosts}"
port="${rabbit.port}" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- queue 队列声明
durable:true、false true:在服务器重启时,能够存活
exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列
autodelete:当没有任何消费者使用时,自动删除该队列 -->
<!-- 对外接口保存日志队列 -->
<rabbit:queue id="save_out_log" durable="true" auto-delete="false"
exclusive="false" name="save_out_log" />
</beans>
列出java代码,ProducerDao生产者类。
/**
* MQ生产者
*/
public interface ProducerDao {
/**
* 发送消息
* @param key
* @param obj void
*/
public void sendData(String key,Object obj);
}
ProducerDaoImpl生产者实现类。
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
/**
* MQ生产者实现
*/
@Repository("producerDao")
public class ProducerDaoImpl implements ProducerDao{
@Autowired
private AmqpTemplate amqpTemplate;
public void sendData(String key,Object obj) {
amqpTemplate.convertAndSend(key, obj);
}
}
在rabbitmq-content.xml中加入以下代码。
<!-- exchange queue binging key 绑定 -->
<rabbit:direct-exchange name="mq-exchange"
durable="true" auto-delete="false" id="mq-exchange" >
<rabbit:bindings>
<rabbit:binding queue="save_out_log" key="save_out_log_key" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<bean id="jsonMessageConverter" class="com.baicaiqiche.core.queue.dao.FastJsonMessageConverter"></bean>
<!-- spring template声明 -->
<rabbit:template exchange="my-mq-exchange" id="amqpTemplate"
connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
<!-- 监听器 -->
<bean id="saveOutLogListenter" class="com.baicaiqiche.core.queue.dao.SaveOutLogListenter"></bean>
<!-- 监听对外接口保存日志队列
acknowledge="manual" 设置确认消息为手动模式-->
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="manual" transaction-size="5" >
<rabbit:listener queues="save_out_log" ref="saveOutLogListenter" />
</rabbit:listener-container>
FastJsonMessageConverter转换类。
import java.io.UnsupportedEncodingException;
import net.sf.ezmorph.object.DateMorpher;
import net.sf.json.JSONObject;
import net.sf.json.util.JSONUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
/**
* rabbitmq中json转换
*/
public class FastJsonMessageConverter extends AbstractMessageConverter {
public static final String DEFAULT_CHARSET = "UTF-8";
private volatile String defaultCharset = DEFAULT_CHARSET;
public FastJsonMessageConverter() {
super();
}
public void setDefaultCharset(String defaultCharset) {
this.defaultCharset = (defaultCharset != null) ? defaultCharset
: DEFAULT_CHARSET;
}
public Object fromMessage(Message message)
throws MessageConversionException {
return null;
}
@SuppressWarnings("unchecked")
public <T> T fromMessage(Message message,T t) {
String json = "";
try {
json = new String(message.getBody(),defaultCharset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
JSONUtils.getMorpherRegistry().registerMorpher(new DateMorpher(new String[] {"yyyy-MM-dd", "yyyy-MM-dd HH:mm:ss","yyyy/MM/dd","yyyy/MM/dd HH:mm:ss"}));
return (T) JSONObject.toBean((JSONObject.fromObject(json)), t.getClass());
}
protected Message createMessage(Object objectToConvert,
MessageProperties messageProperties)
throws MessageConversionException {
byte[] bytes = null;
try {
String jsonString = com.alibaba.fastjson.JSONObject.toJSONString(objectToConvert);
bytes = jsonString.getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(this.defaultCharset);
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
return new Message(bytes, messageProperties);
}
}
队列监听器ChannelAwareMessageListener类。
import javax.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import com.alibaba.fastjson.JSONObject;
import com.baicaiqiche.core.credit.domain.OutputServiceLog;
import com.baicaiqiche.core.credit.service.OutputServiceLogService;
import com.rabbitmq.client.Channel;
/**
* 队列监听器
*/
public class SaveOutLogListenter implements ChannelAwareMessageListener {
@Resource
private OutputServiceLogService outputServiceLogService;
@Override
public void onMessage(Message msg, Channel channel) {
try{
System.out.println("-------------MQ--------------");
System.out.println(msg.toString());
//接收消息
JSONObject str = JSONObject.parseObject(new String(msg.getBody()));
//处理事务逻辑
OutputServiceLog outputServiceLog = JSONObject.toJavaObject(str, OutputServiceLog.class);
outputServiceLogService.save(outputServiceLog);
//确认消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);
}catch(Exception e){
e.printStackTrace();
}
}
}
在一个方法中注入producerDao,并调用sendData方法。
@Resource
private ProducerDao producerDao;
@Test
public String test(String content) {
Map<String,String> map = new HashMap<String, String>();
map.put("phone", bm.get("phone").toString());
map.put("idNo", bm.get("idNo").toString());
producerDao.sendData("zx_black_risk_key", map);
logger.info(">>>>>>>>>返回的数据为:" + ret);
return ret;
}
}
日志显示
-------------MQ--------------
(Body:'{"idNo":"34112519930802095x","phone":" "}' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=my-mq-exchange, receivedRoutingKey=zx_black_risk_key, receivedDelay=null, deliveryTag=5, messageCount=0, consumerTag=amq.ctag-ivOWDpOkyIhOtinz9ICGPA, consumerQueue=zx_black_risk])
因为是剪切项目中的代码,有什么问题可以提出来。