转载

SpringBoot 中使用RabbitMQ(一)

交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。

交换机有四种类型:Direct, topic, Headers and Fanout

* Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
  * Topic:按规则转发消息(最灵活)
  * Headers:设置 header attribute 参数类型的交换机
  * Fanout:转发消息到所有绑定队列(广播模式)
复制代码

下面介绍常用的三种模式的基础用法。

SpringBoot 整合

Pom 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码

application.properties 配置文件

# rabbitmq连接参数
spring.rabbitmq.host=  # mq ip地址
spring.rabbitmq.port=5672 # 端口 默认5672
spring.rabbitmq.username=admin # 用户名
spring.rabbitmq.password=admin # 密码
# 开启发送确认(开启此模式,生产者成功发送到交换机后执行相应的回调函数)
#spring.rabbitmq.publisher-confirms=true
# 开启发送失败退(开启此模式,交换机路由不到队列时执行相应的回调函数)
#spring.rabbitmq.publisher-returns=true
# 开启消费者手动确认 ACK 默认auto
#spring.rabbitmq.listener.direct.acknowledge-mode=manual
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
复制代码

Direct Exchange

direct类型的Exchange路由规则很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中

SpringBoot 中使用RabbitMQ(一)
  • 定义配置类,注册交换机和队列并进行绑定
/**
 * Rabbit 配置类
 * @author peng
 */
@Configuration
public class DirectRabbitConfig {

    @Bean
    DirectExchange directExchange(){
        // 注册一个 Direct 类型的交换机 默认持久化、非自动删除
        return new DirectExchange("directExchange");
    }

    @Bean
    Queue infoQueue(){
        // 注册队列
        return new Queue("infoMsgQueue");
    }
    
    @Bean
    Queue warnQueue(){
        return new Queue("warnMsgQueue");
    }
    
    @Bean
    Binding infoToExchangeBinging(Queue infoQueue, DirectExchange directExchange) {
        // 将队列以 info-msg 为绑定键绑定到交换机
        return BindingBuilder.bind(infoQueue).to(directExchange).with("info-msg");
    }
    
    @Bean
    Binding warnToExchangeBinging(Queue warnQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(warnQueue).to(directExchange).with("warn-msg");
    }
}
复制代码
  • 定义生产者
/**
 * 生产者
 * @author peng
 */
@Component
public class DirectSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendInfo() {
        String content = "I am Info msg!";
        // 将消息以info-msg绑定键发送到directExchange交换机
        this.rabbitTemplate.convertAndSend("directExchange", "info-msg", content);
        System.out.println("########### SendInfoMsg : " + content);
    }
    
    public void sendWarn() {
        String content = "I am Warn msg!";
        this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
        System.out.println("########### SendWarnMsg : " + content);
    }
    
    public void sendWarn(int i) {
        String content = "I am Warn msg! " + i;
        this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
        System.out.println("########### SendWarnMsg : " + content);
    }
    
    public void sendError() {
        String content = "I am Error msg!";
        this.rabbitTemplate.convertAndSend("directExchange", "error-msg", content);
        System.out.println("########### SendErrorMsg : " + content);
    }
}

复制代码
  • 定义消费者
消费者1
/**
 * @author peng
 */
@Component
// 标记此类为Rabbit消息监听类,监听队列infoMsgQueue
@RabbitListener(queues = "infoMsgQueue")
public class DirectReceiver1 {

    // 定义处理消息的方法
    @RabbitHandler
    public void process(String message) {
        System.out.println("########### DirectReceiver1 Receive InfoMsg:" + message);
    }
}

消费者2 
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("########### DirectReceiver2 Receive warnMsg:" + message);
    }
}
复制代码
  • 基础用法测试
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class DirectTest {
    @Autowired
    private DirectSender directSender;

    @Test
    public void send() {
        directSender.sendInfo();
        directSender.sendWarn();
        directSender.sendError();
    }
}

结果

    ########### SendInfoMsg : I am Info msg!
    ########### SendWarnMsg : I am Warn msg!
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg!
    ########### DirectReceiver1 Receive InfoMsg:I am Info msg!
    
    InfoMsg 以info-msg绑定键发送到directExchange交换机,交换机路由到infoMsgQueue队列,DirectReceiver1监听此队列接受消息。
    WarnMsg 同理
    ErrorMsg 由于没有队列的绑定键为 error-msg 所以消息会被丢弃

复制代码
  • 一对多测试
消费者3
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver3 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("########### DirectReceiver3 Receive warnMsg:" + message);
    }
}

// 一对多
@Test
public void oneToMany() {
    for (int i = 0; i< 100 ; i++){
        directSender.sendWarn(i);
    }
}

结果
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 6
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 8
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 10
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 5
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 7
    
    消费者2 和 消费者3 均匀(条数上)的消费了消息

复制代码
  • 多对多测试
/**
 * 生产者3
 * @author peng
 */
@Component
public class DirectSender2 {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendWarn(int i) {
        String content = "I am Warn msg! " + i +" fromSend2";
        this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
        System.out.println("########### SendWarnMsg : " + content);
    }
}

// 多对多
@Test
public void manyToMany() {
    for (int i = 0; i< 10 ; i++){
        directSender.sendWarn(i);
        directSender2.sendWarn(i);
    }
}

结果

    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 0 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0 fromSend1
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 1 fromSend1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1 fromSend2
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 2 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2 fromSend1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 3 fromSend1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 4 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4 fromSend1
   
    消费者2和消费者3分别接受了生产者1 和生产者2的消息
复制代码

Fanout Exchang

fanout类型的Exchange路由规则非常简单,会发送给所有绑定到该交换机的队列上。会忽略路由键

SpringBoot 中使用RabbitMQ(一)

配置类

/**
 * @author peng
 */
@Configuration
public class FanoutRabbitConfig {

    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Queue queue1(){
        return new Queue("fanout.1");
    }
    @Bean
    Queue queue2(){
        return new Queue("fanout.2");
    }
    @Bean
    Queue queue3(){
        return new Queue("fanout.3");
    }

    @Bean
    Binding bindingExchange1(Queue queue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchange2(Queue queue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchange3(Queue queue3, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue3).to(fanoutExchange);
    }
}
复制代码

生产者

@Component
public class FanoutSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, fanout msg ";
        this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
        System.out.println("######## Sender : " + context);
    }
}

复制代码

消费者

消费者1
/**
 * @author peng
 */
@Component
@RabbitListener(queues = "fanout.1")
public class FanoutReceiver1 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("fanout Receiver 1  : " + message);
    }
}

消费者2
@Component
@RabbitListener(queues = "fanout.2")
public class FanoutReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("fanout Receiver 2  : " + message);
    }
}

消费者3
@Component
@RabbitListener(queues = "fanout.3")
public class FanoutReceiver3 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("fanout Receiver 3  : " + message);
    }
}

复制代码

测试

@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class FanoutTest {
    @Autowired
    private FanoutSender fanoutSender;

    @Test
    public void send() {
        fanoutSender.send();
    }
}

结果

######## Sender : hi, fanout msg 
fanout Receiver 1  : hi, fanout msg 
fanout Receiver 2  : hi, fanout msg 
fanout Receiver 3  : hi, fanout msg
复制代码
原文  https://juejin.im/post/5cefc04251882510eb758630
正文到此结束
Loading...