在Spring启动时,利用Spring Bean管理工厂BeanFactory接口,实现动态创建交换机、队列、交换机和队列的绑定关系,让我们无需进行重复的编码工作。
当发送消息发送至“fanout”类型交换机时,RabbitMQ将会忽略路由键,直接将消息发送至该交换机下所有队列。从而被每一个队列的消费者进行消费。
当发送消息发送至“direct”类型交换机时,RabbitMQ将会查找与传入路由键完全匹配的队列,然后将消息放入队列当中,从被队列的消费者进行消费。
当发送消息发送至“topic”类型交换机时,RabbitMQ将会查找与传入路由键规则匹配的队列,然后将消息放入队列当中,从被队列的消费者进行消费。
当发送消息发送至“headers”类型交换机时,RabbitMQ将会忽略路由键,根据在绑定路由器及队列时设置的完全匹配或部分匹配作为依据,以消息内容中的headers属性对队列进行匹配,匹配成功的队列才可以收到该消息从而进一步被消费者消费。
manual模式(手动ACK)下,注意一定要进行消息的确认。如果忘记了ACK,那么后果很严重。当consumer退出时候,message会一直重新分发。然后RabbitMQ会占用越来越多的内容,由于RabbitMQ会长时间运行,因此这个"内存泄漏"是致命的。
说明:RabbitMQ交换器类型枚举
作用:声明了交换器的常用类型
操作方式:无需操作
备注:无
说明:RabbitMQ交换器枚举
作用:声明了项目需要使用到的交换机
操作方式: 按需增删
备注:由于fanout类型的交换机会忽略路由键直接下发所有的路由器,故每个不同的广播都应新建一个交换机
说明:队列参数
作用:声明了在创建队列时使用到的参数Map
操作方式: 按需修改
备注:无
说明:队列头信息
作用:声明了ExchangeTypeEnum.headers类型的交换机所使用的匹配信息
操作方式: 按需修改
备注:无
说明:队列配置枚举
作用:声明了项目需要使用的队列
操作方式: 无需操作
备注:所有指定为ExchangeEnum.fanout的队列,都将在队列名称后面追加“.10_87_10_*”(数字部分为服务所在IP地址)
说明:RabbitMQ核心配置类
作用:实现动态创建交换机、动态创建队列、动态绑定
操作方式: 无需操作
备注:该类已将所有队列名称以Map形式注册至spring bean工厂,bean名称“queuesNames”,结构:Map<String, String>(Map<队列配置枚举名称, 队列名称>) 消费者可直接使用@RabbitListener(queues = {"#{queuesNames.队列配置枚举名称}"})进行监听
说明:默认回调类,实现自MQCallback
作用:消息发送成功或失败的回调类
操作方式: 无需操作
备注:可手动进行实现,调用queueMessageServiceImpl.setMQCallback(mqCallback)方法即可实现修改
说明:默认消息发送类,实现自QueueMessageService
作用:实现了消息发送的基本接口
操作方式: 无需操作
备注:无
public static <T> boolean registerBean(String beanName, T bean) {
// 将applicationContext转换为ConfigurableApplicationContext
ConfigurableApplicationContext context = (ConfigurableApplicationContext) SpringContextUtil.getApplicationContext();
// 将bean对象注册到bean工厂
context.getBeanFactory().registerSingleton(beanName, bean);
log.debug("【SpringContextUtil】注册实例“" + beanName + "”到spring容器:" + bean);
return true;
}
复制代码
@Bean("queuesNames")
public Map<String, String> queuesNames() {
return QueueEnum.getQueuesNames();
}
// 这个方法在QueueEnum
public static Map<String, String> getQueuesNames() {
return Arrays.asList(QueueEnum.values()).stream().collect(Collectors.toMap(queueEnum -> queueEnum.toString(), queueEnum -> queueEnum.getName()));
}
复制代码
/**
* @return java.lang.Object
* @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createExchange();
* @description 动态创建交换机
* @param: this is no-parameter method.
* @author zhiqiang94@vip.qq.com
* @create 2020/4/16 0016 9:28
*/
@Bean("createExchange")
public Object createExchange() {
// 遍历交换机枚举
ExchangeEnum.toList().forEach(exchangeEnum -> {
// 声明交换机
Exchange exchange;
// 根据交换机模式 生成不同的交换机
switch (exchangeEnum.getType()) {
case fanout:
exchange = ExchangeBuilder.fanoutExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
break;
case topic:
exchange = ExchangeBuilder.directExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
break;
case headers:
exchange = ExchangeBuilder.headersExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
break;
case direct:
default:
exchange = ExchangeBuilder.topicExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
break;
}
// 将交换机注册到spring bean工厂 让spring实现交换机的管理
if (exchange != null) {
SpringContextUtil.registerBean(exchangeEnum.toString() + "_exchange", exchange);
}
}
);
// 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
return null;
}
复制代码
/**
* @return java.lang.Object
* @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createQueue();
* @description 动态创建队列
* @param: this is no-parameter method.
* @author zhiqiang94@vip.qq.com
* @create 2020/4/16 0016 9:29
*/
@Bean("createQueue")
public Object createQueue() {
// 遍历队列枚举 将队列注册到spring bean工厂 让spring实现队列的管理
QueueEnum.toList().forEach(queueEnum -> SpringContextUtil.registerBean(queueEnum.toString() + "_queue", new Queue(queueEnum.getName(), queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments())));
// 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
return null;
}
复制代码
/**
* @return java.lang.Object
* @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createBinding();
* @description 动态将交换机及队列绑定
* @param: this is no-parameter method.
* @author zhiqiang94@vip.qq.com
* @create 2020/4/16 0016 9:29
*/
@Bean("createBinding")
public Object createBinding() {
// 遍历队列枚举 将队列绑定到指定交换机
QueueEnum.toList().forEach(queueEnum -> {
// 从spring bean工厂中获取队列对象(刚才注册的)
Queue queue = SpringContextUtil.getBean(queueEnum.toString() + "_queue", Queue.class);
// 声明绑定关系
Binding binding;
// 根据不同的交换机模式 获取不同的交换机对象(注意:刚才注册时使用的是父类Exchange,这里获取的时候将类型获取成相应的子类)生成不同的绑定规则
switch (queueEnum.getExchangeEnum().getType()) {
case fanout:
FanoutExchange fanoutExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", FanoutExchange.class);
binding = BindingBuilder.bind(queue).to(fanoutExchange);
break;
case topic:
TopicExchange topicExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", TopicExchange.class);
binding = BindingBuilder.bind(queue).to(topicExchange).with(queueEnum.getRoutingKey());
break;
case headers:
HeadersExchange headersExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", HeadersExchange.class);
if (queueEnum.isWhereAll()) {
// whereAll表示全部匹配
binding = BindingBuilder.bind(queue).to(headersExchange).whereAll(queueEnum.getHeaders()).match();
} else {
// whereAny表示部分匹配
binding = BindingBuilder.bind(queue).to(headersExchange).whereAny(queueEnum.getHeaders()).match();
}
break;
case direct:
default:
DirectExchange directExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", DirectExchange.class);
binding = BindingBuilder.bind(queue).to(directExchange).with(queueEnum.getRoutingKey());
break;
}
// 将绑定关系注册到spring bean工厂 让spring实现绑定关系的管理
if (binding != null) {
SpringContextUtil.registerBean(queueEnum.toString() + "_binding", binding);
}
}
);
// 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
return null;
}
复制代码
当消费者方法为void方法时,QueueMessageService.convertSendAndReceive方法返回值为null,注意检查消费者监听方法。
当QueueMessageService.convertSendAndReceive发送的消息被多个消费者消费,返回的结果为第一个消费者的返回值。建议有返回值的队列使用直连模式(direct),这样能保证该消息只有一个消费者进行消费。
广播交换机的角色类似于村里面的小喇叭,当有新的政策传达给这个村时,就会使用小喇叭播放,这样每个人(队列)都停到了内容。
可用于发送类似公告、系统全局通知等广播需求。
直连交换机的角色类似于邮递员,信封上写明了收件地址,收到的消息会根据目的地的不同投入到不同的队列中,若是邮递员没有找到这个地址,这封信就被丢弃了。
可用于点对点之间的通讯。
通配符交换机类似于脑子不聪明的邮件分发员,收到有一封送往“张家镇李家村12号”的信件,分发员来到分发点,发现有两个有污渍的邮箱(队列),一个能看清*李家村*,一个能看清张家镇*,快递员干脆就将信件复制了一份,往两个地方都送了投递了一份。
可用于同一个任务,需要不同模块或者分布式系统的模块同时执行时使用。
头交换机类似于程序中的鉴权,管理员拥有全部权限,用户拥有查看权限,用户无法调用需要管理员权限的接口(消息无法分发到未监听该header的队列),拥有相应权限才能调用相应权限的接口(消息分发到监听该header的队列)。这里需要注意,头交换机支持完全匹配和部分匹配,完全匹配为管理员接口必须拥有所有管理员权限才可调用,即拥有所有管理员权限才显示进入管理员后台的按钮。部分匹配为拥有其中一个或部分权限即可调用,即拥有任意管理员权限就显示进入管理员后台的按钮。