转载

spring整合rabbitmq

关于spring整合rabbitmq看了网上很多资料感觉描述的不够详细,正好最近自己使用到了这项技术,总结一下的详细过程,分享给大家。

准备工作

1、首先有一个springMVC的demo,这里就不再介绍,自己提前准备。注意的事情为spring版本不能过低,否则会报错,我就陷入这个坑中了。spring采用4.2.3.RELEASE版本.

2、安装rabbitmq服务,以前博客中详细教程: http://blog.csdn.net/l18637220680/article/details/75258280 。

3、准备jar包

<!--rabbitmq依赖 -->  
        <dependency>  
            <groupId>org.springframework.amqp</groupId>  
            <artifactId>spring-rabbit</artifactId>  
            <version>1.6.1.RELEASE</version>  
        </dependency>  

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.0.0</version>
        </dependency>

连接rabbitmq服务

创建rabbitmq-content.xml和rabbitmq.properties配置文件。rabbitmq.properties文件如下:

rabbit.hosts=192.168.1.239
rabbit.username=admin
rabbit.password=admin123
rabbit.port=5672

在spring的配置文件中引入rabbitmq-content.xml和rabbitmq.properties配置文件。

<bean
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>              
                <value>classpath:/rabbitmq.properties</value>
            </list>
        </property>
    </bean>
 <import resource="classpath:/rabbitmq-context.xml" />

创建生产者

rabbitmq-content.xml如下。注意xml的命名空间的配置,不要有缺漏。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="http://www.springframework.org/schema/beans      
                        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
                        http://www.springframework.org/schema/context      
                        http://www.springframework.org/schema/context/spring-context-3.0.xsd   
                        http://www.springframework.org/schema/rabbit    
                        http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
                        http://www.springframework.org/schema/util  
                        http://www.springframework.org/schema/util/spring-util-3.0.xsd">

    <!-- 定义连接工厂,用于创建连接等 -->
    <rabbit:connection-factory id="connectionFactory"
        username="${rabbit.username}" password="${rabbit.password}" host="${rabbit.hosts}"
        port="${rabbit.port}" />
    <rabbit:admin connection-factory="connectionFactory" />

    <!-- queue 队列声明 
         durable:true、false true:在服务器重启时,能够存活 
         exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列 
         autodelete:当没有任何消费者使用时,自动删除该队列 -->
    <!-- 对外接口保存日志队列 -->
    <rabbit:queue id="save_out_log" durable="true" auto-delete="false"
        exclusive="false" name="save_out_log" />
</beans>

列出java代码,ProducerDao生产者类。

/**
 * MQ生产者
 */
public interface ProducerDao {

    /**
     * 发送消息
     * @param key
     * @param obj void
     */
    public void sendData(String key,Object obj);

}

ProducerDaoImpl生产者实现类。

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

/** 
 * MQ生产者实现
 */
@Repository("producerDao")
public class ProducerDaoImpl implements ProducerDao{
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendData(String key,Object obj) {
        amqpTemplate.convertAndSend(key, obj);
    }   
}

创建异步消费者(监听器)

在rabbitmq-content.xml中加入以下代码。

<!-- exchange queue binging key 绑定 -->
    <rabbit:direct-exchange name="mq-exchange" 
        durable="true" auto-delete="false" id="mq-exchange" >
        <rabbit:bindings>
            <rabbit:binding queue="save_out_log" key="save_out_log_key" />

        </rabbit:bindings>
    </rabbit:direct-exchange>

<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
    <bean id="jsonMessageConverter" class="com.baicaiqiche.core.queue.dao.FastJsonMessageConverter"></bean>

    <!-- spring template声明 -->
    <rabbit:template exchange="my-mq-exchange" id="amqpTemplate" 
        connection-factory="connectionFactory" message-converter="jsonMessageConverter" />

    <!-- 监听器 -->
    <bean id="saveOutLogListenter" class="com.baicaiqiche.core.queue.dao.SaveOutLogListenter"></bean>


    <!--  监听对外接口保存日志队列 
    acknowledge="manual" 设置确认消息为手动模式-->
    <rabbit:listener-container
        connection-factory="connectionFactory" acknowledge="manual" transaction-size="5" >
        <rabbit:listener queues="save_out_log" ref="saveOutLogListenter" />
    </rabbit:listener-container>

FastJsonMessageConverter转换类。

import java.io.UnsupportedEncodingException;
import net.sf.ezmorph.object.DateMorpher;
import net.sf.json.JSONObject;
import net.sf.json.util.JSONUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;

/**
 * rabbitmq中json转换
 */
public class FastJsonMessageConverter  extends AbstractMessageConverter {

    public static final String DEFAULT_CHARSET = "UTF-8";
    private volatile String defaultCharset = DEFAULT_CHARSET;
    public FastJsonMessageConverter() {
        super();
    }

    public void setDefaultCharset(String defaultCharset) {
        this.defaultCharset = (defaultCharset != null) ? defaultCharset
                : DEFAULT_CHARSET;
    }

    public Object fromMessage(Message message)
            throws MessageConversionException {
        return null;
    }

    @SuppressWarnings("unchecked")
    public <T> T fromMessage(Message message,T t) {
        String json = "";
        try {
            json = new String(message.getBody(),defaultCharset);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        JSONUtils.getMorpherRegistry().registerMorpher(new DateMorpher(new String[] {"yyyy-MM-dd", "yyyy-MM-dd HH:mm:ss","yyyy/MM/dd","yyyy/MM/dd HH:mm:ss"}));
        return (T) JSONObject.toBean((JSONObject.fromObject(json)), t.getClass());
    }   


    protected Message createMessage(Object objectToConvert,
            MessageProperties messageProperties)
            throws MessageConversionException {
        byte[] bytes = null;
        try {
            String jsonString = com.alibaba.fastjson.JSONObject.toJSONString(objectToConvert);
            bytes = jsonString.getBytes(this.defaultCharset);
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException(
                    "Failed to convert Message content", e);
        } 
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(this.defaultCharset);
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        return new Message(bytes, messageProperties);

    }
}

队列监听器ChannelAwareMessageListener类。

import javax.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import com.alibaba.fastjson.JSONObject;
import com.baicaiqiche.core.credit.domain.OutputServiceLog;
import com.baicaiqiche.core.credit.service.OutputServiceLogService;
import com.rabbitmq.client.Channel;

/**
 * 队列监听器
 */
public class SaveOutLogListenter implements ChannelAwareMessageListener  {
    @Resource
    private OutputServiceLogService outputServiceLogService;

    @Override
    public void onMessage(Message msg, Channel channel) {
        try{
            System.out.println("-------------MQ--------------");
            System.out.println(msg.toString());
            //接收消息
            JSONObject str = JSONObject.parseObject(new String(msg.getBody()));
            //处理事务逻辑
            OutputServiceLog outputServiceLog = JSONObject.toJavaObject(str, OutputServiceLog.class);
            outputServiceLogService.save(outputServiceLog);
            //确认消息
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);
        }catch(Exception e){
            e.printStackTrace();
        }
    }

}

测试

在一个方法中注入producerDao,并调用sendData方法。

@Resource
    private ProducerDao producerDao;

    @Test
    public String test(String content) {
            Map<String,String> map = new HashMap<String, String>();
            map.put("phone", bm.get("phone").toString());
            map.put("idNo", bm.get("idNo").toString());
            producerDao.sendData("zx_black_risk_key", map);
            logger.info(">>>>>>>>>返回的数据为:" + ret);
            return ret;
        }
    }

日志显示

-------------MQ--------------
(Body:'{"idNo":"34112519930802095x","phone":" "}' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=my-mq-exchange, receivedRoutingKey=zx_black_risk_key, receivedDelay=null, deliveryTag=5, messageCount=0, consumerTag=amq.ctag-ivOWDpOkyIhOtinz9ICGPA, consumerQueue=zx_black_risk])

因为是剪切项目中的代码,有什么问题可以提出来。

原文  http://blog.csdn.net/l18637220680/article/details/77101335#t4
正文到此结束
Loading...