转载

SpringBoot实现RabbitMQ动态创建交换机、队列、交换机及绑定关系

​ 减少重复代码的开发工作,简化RabbitMQ的接入工作,让程序员的关注点更多的集中在业务逻辑的处理上。

​ 源码请跳转至2-②

1、实现原理

动态创建核心原理

​ 在Spring启动时,利用Spring Bean管理工厂BeanFactory接口,实现动态创建交换机、队列、交换机和队列的绑定关系,让我们无需进行重复的编码工作。

广播交换机原理

​ 当发送消息发送至“fanout”类型交换机时,RabbitMQ将会忽略路由键,直接将消息发送至该交换机下所有队列。从而被每一个队列的消费者进行消费。

直连交换机原理

​ 当发送消息发送至“direct”类型交换机时,RabbitMQ将会查找与传入路由键完全匹配的队列,然后将消息放入队列当中,从被队列的消费者进行消费。

通配符交换机原理

​ 当发送消息发送至“topic”类型交换机时,RabbitMQ将会查找与传入路由键规则匹配的队列,然后将消息放入队列当中,从被队列的消费者进行消费。

头交换机原理

​ 当发送消息发送至“headers”类型交换机时,RabbitMQ将会忽略路由键,根据在绑定路由器及队列时设置的完全匹配或部分匹配作为依据,以消息内容中的headers属性对队列进行匹配,匹配成功的队列才可以收到该消息从而进一步被消费者消费。

特别提示

​manual模式(手动ACK)下,注意一定要进行消息的确认。如果忘记了ACK,那么后果很严重。当consumer退出时候,message会一直重新分发。然后RabbitMQ会占用越来越多的内容,由于RabbitMQ会长时间运行,因此这个"内存泄漏"是致命的。

2、源码解析

①、文件说明

ExchangeTypeEnum

说明:RabbitMQ交换器类型枚举

作用:声明了交换器的常用类型

操作方式:无需操作

备注:无

ExchangeEnum

说明:RabbitMQ交换器枚举

作用:声明了项目需要使用到的交换机

操作方式: 按需增删

备注:由于fanout类型的交换机会忽略路由键直接下发所有的路由器,故每个不同的广播都应新建一个交换机

QueueArguments

说明:队列参数

作用:声明了在创建队列时使用到的参数Map

操作方式: 按需修改

备注:无

QueueHeaders

说明:队列头信息

作用:声明了ExchangeTypeEnum.headers类型的交换机所使用的匹配信息

操作方式: 按需修改

备注:无

QueueEnum

说明:队列配置枚举

作用:声明了项目需要使用的队列

操作方式: 无需操作

备注:所有指定为ExchangeEnum.fanout的队列,都将在队列名称后面追加“.10_87_10_*”(数字部分为服务所在IP地址)

RabbitMQConfig

说明:RabbitMQ核心配置类

作用:实现动态创建交换机、动态创建队列、动态绑定

操作方式: 无需操作

备注:该类已将所有队列名称以Map形式注册至spring bean工厂,bean名称“queuesNames”,结构:Map<String, String>(Map<队列配置枚举名称, 队列名称>) 消费者可直接使用@RabbitListener(queues = {"#{queuesNames.队列配置枚举名称}"})进行监听

DefaultMQCallback

说明:默认回调类,实现自MQCallback

作用:消息发送成功或失败的回调类

操作方式: 无需操作

备注:可手动进行实现,调用queueMessageServiceImpl.setMQCallback(mqCallback)方法即可实现修改

QueueMessageServiceImpl

说明:默认消息发送类,实现自QueueMessageService

作用:实现了消息发送的基本接口

操作方式: 无需操作

备注:无

②、核心代码解析

spring示例注册

public static <T> boolean registerBean(String beanName, T bean) {
        // 将applicationContext转换为ConfigurableApplicationContext
        ConfigurableApplicationContext context = (ConfigurableApplicationContext) 	SpringContextUtil.getApplicationContext();
        // 将bean对象注册到bean工厂
        context.getBeanFactory().registerSingleton(beanName, bean);
        log.debug("【SpringContextUtil】注册实例“" + beanName + "”到spring容器:" + bean);
        return true;
    }
复制代码

动态创建队列名称Bean

@Bean("queuesNames")
    public Map<String, String> queuesNames() {
        return QueueEnum.getQueuesNames();
    }

    // 这个方法在QueueEnum
    public static Map<String, String> getQueuesNames() {
        return Arrays.asList(QueueEnum.values()).stream().collect(Collectors.toMap(queueEnum -> queueEnum.toString(), queueEnum -> queueEnum.getName()));
    }
复制代码

动态创建交换机

/**
     * @return java.lang.Object
     * @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createExchange();
     * @description 动态创建交换机
     * @param: this is no-parameter method.
     * @author zhiqiang94@vip.qq.com
     * @create 2020/4/16 0016 9:28
     */
    @Bean("createExchange")
    public Object createExchange() {
        // 遍历交换机枚举
        ExchangeEnum.toList().forEach(exchangeEnum -> {
                    // 声明交换机
                    Exchange exchange;
                    // 根据交换机模式 生成不同的交换机
                    switch (exchangeEnum.getType()) {
                        case fanout:
                            exchange = ExchangeBuilder.fanoutExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
                            break;
                        case topic:
                            exchange = ExchangeBuilder.directExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
                            break;
                        case headers:
                            exchange = ExchangeBuilder.headersExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
                            break;
                        case direct:
                        default:
                            exchange = ExchangeBuilder.topicExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
                            break;
                    }
                    // 将交换机注册到spring bean工厂 让spring实现交换机的管理
                    if (exchange != null) {
                        SpringContextUtil.registerBean(exchangeEnum.toString() + "_exchange", exchange);
                    }
                }
        );
        // 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
        return null;
    }
复制代码

动态创建队列

/**
     * @return java.lang.Object
     * @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createQueue();
     * @description 动态创建队列
     * @param: this is no-parameter method.
     * @author zhiqiang94@vip.qq.com
     * @create 2020/4/16 0016 9:29
     */
    @Bean("createQueue")
    public Object createQueue() {
        // 遍历队列枚举 将队列注册到spring bean工厂 让spring实现队列的管理
        QueueEnum.toList().forEach(queueEnum -> SpringContextUtil.registerBean(queueEnum.toString() + "_queue", new Queue(queueEnum.getName(), queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments())));
        // 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
        return null;
    }
复制代码

动态创建绑定关系

/**
     * @return java.lang.Object
     * @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createBinding();
     * @description 动态将交换机及队列绑定
     * @param: this is no-parameter method.
     * @author zhiqiang94@vip.qq.com
     * @create 2020/4/16 0016 9:29
     */
    @Bean("createBinding")
    public Object createBinding() {
        // 遍历队列枚举 将队列绑定到指定交换机
        QueueEnum.toList().forEach(queueEnum -> {
                    // 从spring bean工厂中获取队列对象(刚才注册的)
                    Queue queue = SpringContextUtil.getBean(queueEnum.toString() + "_queue", Queue.class);
                    // 声明绑定关系
                    Binding binding;
                    // 根据不同的交换机模式 获取不同的交换机对象(注意:刚才注册时使用的是父类Exchange,这里获取的时候将类型获取成相应的子类)生成不同的绑定规则
                    switch (queueEnum.getExchangeEnum().getType()) {
                        case fanout:
                            FanoutExchange fanoutExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", FanoutExchange.class);
                            binding = BindingBuilder.bind(queue).to(fanoutExchange);
                            break;
                        case topic:
                            TopicExchange topicExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", TopicExchange.class);
                            binding = BindingBuilder.bind(queue).to(topicExchange).with(queueEnum.getRoutingKey());
                            break;
                        case headers:
                            HeadersExchange headersExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", HeadersExchange.class);
                            if (queueEnum.isWhereAll()) {
                                // whereAll表示全部匹配
                                binding = BindingBuilder.bind(queue).to(headersExchange).whereAll(queueEnum.getHeaders()).match();
                            } else {
                                // whereAny表示部分匹配
                                binding = BindingBuilder.bind(queue).to(headersExchange).whereAny(queueEnum.getHeaders()).match();
                            }
                            break;
                        case direct:
                        default:
                            DirectExchange directExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", DirectExchange.class);
                            binding = BindingBuilder.bind(queue).to(directExchange).with(queueEnum.getRoutingKey());
                            break;
                    }
                    // 将绑定关系注册到spring bean工厂 让spring实现绑定关系的管理
                    if (binding != null) {
                        SpringContextUtil.registerBean(queueEnum.toString() + "_binding", binding);
                    }
                }
        );
        // 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
        return null;
    }
复制代码

3、使用教程

①、按需增删交换机枚举

SpringBoot实现RabbitMQ动态创建交换机、队列、交换机及绑定关系

②、按需增删队列枚举

SpringBoot实现RabbitMQ动态创建交换机、队列、交换机及绑定关系

③、实现消费者监听

SpringBoot实现RabbitMQ动态创建交换机、队列、交换机及绑定关系

④、调用生产者发送消息

SpringBoot实现RabbitMQ动态创建交换机、队列、交换机及绑定关系

⑤、查看测试结果

SpringBoot实现RabbitMQ动态创建交换机、队列、交换机及绑定关系

4、常见问题

①、调用QueueMessageService.convertSendAndReceive方法返回值为null

​ 当消费者方法为void方法时,QueueMessageService.convertSendAndReceive方法返回值为null,注意检查消费者监听方法。

②、QueueMessageService.convertSendAndReceive方法被多个消费者消费怎么办?

​ 当QueueMessageService.convertSendAndReceive发送的消息被多个消费者消费,返回的结果为第一个消费者的返回值。建议有返回值的队列使用直连模式(direct),这样能保证该消息只有一个消费者进行消费。

③、广播交换机(ExchangeTypeEnum.fanout)的使用场景有哪些

​广播交换机的角色类似于村里面的小喇叭,当有新的政策传达给这个村时,就会使用小喇叭播放,这样每个人(队列)都停到了内容。

​ 可用于发送类似公告、系统全局通知等广播需求。

④、直连交换机(ExchangeTypeEnum.direct)的使用场景有哪些

​直连交换机的角色类似于邮递员,信封上写明了收件地址,收到的消息会根据目的地的不同投入到不同的队列中,若是邮递员没有找到这个地址,这封信就被丢弃了。

​ 可用于点对点之间的通讯。

⑤、通配符交换机(ExchangeTypeEnum.topic)的使用场景有哪些

​通配符交换机类似于脑子不聪明的邮件分发员,收到有一封送往“张家镇李家村12号”的信件,分发员来到分发点,发现有两个有污渍的邮箱(队列),一个能看清*李家村*,一个能看清张家镇*,快递员干脆就将信件复制了一份,往两个地方都送了投递了一份。

​ 可用于同一个任务,需要不同模块或者分布式系统的模块同时执行时使用。

⑥、头交换机(ExchangeTypeEnum.headers)的使用场景有哪些

​头交换机类似于程序中的鉴权,管理员拥有全部权限,用户拥有查看权限,用户无法调用需要管理员权限的接口(消息无法分发到未监听该header的队列),拥有相应权限才能调用相应权限的接口(消息分发到监听该header的队列)。这里需要注意,头交换机支持完全匹配和部分匹配,完全匹配为管理员接口必须拥有所有管理员权限才可调用,即拥有所有管理员权限才显示进入管理员后台的按钮。部分匹配为拥有其中一个或部分权限即可调用,即拥有任意管理员权限就显示进入管理员后台的按钮。

​ 本文仅为自己学习笔记,由于实现原理非常简单,且核心源码已在本文中展示,故不发放源码连接了。由于本人对RabbitMQ研究并不深入,本文若有错误之处,请大佬指出,我一定虚心学习,积极改正。

​ 第一次写文章,希望大家多多鼓励,多多支持。

原文  https://juejin.im/post/5ea4df86f265da47c06ee53e
正文到此结束
Loading...