转载

SpringCloud Stream概念入门

官方对 Spring Cloud Stream 的一段介绍: Spring Cloud St ream 是一个用于 构建基于消息的微服务应用框架 。其目的是为了简化消息在 Spring Cloud 应用程序中的开发。

老顾来翻译一下,就是现在的消息中间件比较多,如:RabbitMQ、Kafka、RocketMq等;使用方法也不一样,但是 他们的本质流程是一样 ,都有 发布/订阅、消费组以及消息分区 这三个核心概念。

所以SpringCloud就实现了 一套轻量级的消息驱动的微服务框架 ;通过使用 Spring Cloud Stream,可以 忽略消息中间件的差异 ,有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有 更多的精力关注于核心业务逻辑 的处理。 老顾先带着小伙伴们了解几个概念,这样会更方便理解。

什么是 Spring Messaging

Spring Messaging是Spring Framework中的一个模块,其作用就是 统一消息的编程模型

  • 比如消息Messaging对应的模型就 包括一个消息体Payload和消息头Header
    SpringCloud Stream概念入门
SpringCloud Stream概念入门
  • 消息通道MessageChannel用于接收消息 ,调用 send方法 可以将 消息发送至该消息通道
SpringCloud Stream概念入门
SpringCloud Stream概念入门
消息通道里的消息如何被消费呢?
  • 由消息通道的子接口可订阅的 消息通道SubscribableChannel实现 ,被 MessageHandler消息处理器所订阅
SpringCloud Stream概念入门
  • 由MessageHandler真正地 消费/处理消息
SpringCloud Stream概念入门

Spring Messaging在消息模型的基础上衍生出了其它的一些功能,如:

1、消息接收参数及返回值处理:消息接收参数处理器 HandlerMethodArgumentResolver 配合@Header, @Payload等注解使用;消息接收后的返回值处理器 HandlerMethodReturnValueHandler 配合@SendTo注解使用。

2、消息体 内容转换器MessageConverter

3、统一抽象的消息发送模板 AbstractMessageSendingTemplate

4、消息通道拦截器 ChannelInterceptor

什么是 Spring Integration

Spring Integration是一个功能强大的 EIP(Enterprise Intergration Patterns,即企业集成模式)

是对 Spring Messaging的扩展 ,它提出了不少新的概念,包括 消息的路由 MessageRoute、 消息的分发 MessageDispatcher、 消息的过滤 Filter、 消息的转换 Transformer、 消息的聚合 Aggregator、 消息的分割 Splitter等等。

总结一句话就是对 消息消费时进行额外的处理

1、消息的分割

SpringCloud Stream概念入门
2、消息的聚合
SpringCloud Stream概念入门
3、消息的过滤
SpringCloud Stream概念入门
4、消息的分发
SpringCloud Stream概念入门

我们来看一个例子

SpringCloud Stream概念入门

1、步骤一先创建一个 可订阅消息通道messageChannel

2、定义一个消息 消费者messagehandler ,去 消费通道里面的消息 ;用了lammda表达式实现了,简单的 输出一句话

3、步骤三发送一个消息

消息最终被消息 通道里的 MessageHandler 所消费 ,最后控制台打印出:

接收到: 第一条消息内容
复制代码

我们再进入DirectChannel, 内部有一个对象UnicastingDispatcher这个是消息分发器 ,会分发到对应的 消息通道MessageChannel 中;

UnicastingDispatcher 是个 单播的分发器只能选择一个消息通道 。那么如何选择呢? 内部提供了 LoadBalancingStrategy 负载均衡策略,默认只有轮询的实现,可以进行扩展。

上面的代码改动一下

SpringCloud Stream概念入门
由于DirectChannel内部的消息分发器是 UnicastingDispatcher单播的方式 ,并且 采用轮询的负载均衡策略 ,所以这里两次的消费 分别对应这两个MessageHandler

。控制台打印出:

SpringCloud Stream概念入门
如果我们要实现广播方式,也就是 BroadcastingDispatcher ,它被 PublishSubscribeChannel 这个消息通道所使用。广播消息分发器会把消息分发给 所有的MessageHandler。
SpringCloud Stream概念入门
发送两个消息, 都被所有的MessageHandler所消费

。控制台打印:

SpringCloud Stream概念入门

Spring Cloud Stream

SpringCloud Stream(以下简称SCS)在 Spring Integration 的基础上进行了封装 ,提出了 Binder, Binding , @EnableBinding, @StreamListener 等概念 。另外SCS也整合了其他模块

1、与 Spring Boot Actuator整合提供了/bindings, /channelsendpoint

2、与Spring Boot Externalized Configuration 整合,提供了 BindingProperties, BinderProperties等外部化配置类

3、SCS增强了 消息发送失败的和消费失败情况下的处理逻辑 等功能

SCS 是 Spring Integration 的加强 ,同时 与 Spring Boot 体系 进行了融合,也是 Spring Cloud Bus 的基础。 它屏蔽了底层消息中间件的实现细节 ,希望以 统一的一套API 来进行消息的发送/消费, 底层消息中间件的实现细节由各消息中间件的Binder 完成

Binder是提供与外部消息中间件集成的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们 分别用于构造生产者和消费者 。目前官方的实现有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 内部已经实现了RocketMQ Binder。

SpringCloud Stream概念入门
从图中可以看出, Binding是连接应用程序跟消息中间件的桥梁

,用于消息的消费和生产。

绑定器

Binder绑定器是Spring Cloud Stream中一个非常重要的概念。 在没有绑定器这个概念的情况下,我们的Spring Boot应用要直接与消息中间件进行信息交互的时候 ,由于各消息中间件构建的初衷不同, 它们的实现细节上会有较大的差异性 ,这使得我们实现的消息交互逻辑就会非常笨重,因为 对具体的中间件实现细节有太重的依赖 ,当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。 通过定义绑定器作为中间层 ,完美地实现了 应用程序与消息中间件细节之间的隔离 。通过向应用程序 暴露统一的Channel通道 ,使得应用程序 不需要再考虑各种不同的消息中间件实现 。当我们需要 升级消息中间件 ,或是更换其他消息中间件产品时,我们要做的就是 更换它们对应的Binder绑定器而不需要修改任何Spring Boot的应用逻辑

发布-订阅模式

在Spring Cloud Stream中的消息通信方式 遵循了发布-订阅模式 ,当一条消息被投递到消息中间件之后, 它会通过共享的Topic主题进行广播 ,消息 消费者在订阅的主题中收到它并触发自身的业务逻辑处理 。这里所提到的 Topic主题是Spring Cloud Stream中的一个抽象概念 ,用来代表发布共享消息给消费者的地方。

在不同的消息中间件中,Topic可能对应着不同的概念,比如:在 RabbitMQ中的它对应了Exchange 、而在 Kakfa中则对应了Kafka中的Topic

消费组

虽然Spring Cloud Stream通过发布-订阅模式将消息生产者与消费者做了很好的解耦,基于相同主题的消费者可以轻松的进行扩展, 但是这些扩展都是针对不同的应用实例而言的 ,在现实的微服务架构中, 我们每一个微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例 。很多情况下, 消息生产者发送消息给某个具体微服务时,只希望被消费一次 ,按照上面我们启动两个应用的例子, 虽然它们同属一个应用,但是这个消息出现了被重复消费两次的情况 。为了解决这个问题, 在Spring Cloud Stream中提供了消费组的概念 。 如果在同一个主题上的应用需要启动多个实例的时候, 我们可以通过spring.cloud.stream.bindings.input.group属性为应用指定一个组名 ,这样这个应用的多个实例在接收到消息的时候, 只会有一个成员真正的收到消息并进行处理 。如下图所示,我们为Service-A和Service-B分别启动了两个实例,并且 根据服务名进行了分组 ,这样当消息进入主题之后, Group-A和Group-B都会收到消息的副本,但是在两个组中都只会有一个实例对其进行消费。

SpringCloud Stream概念入门

消息分区

通过引入消费组的概念,我们已经能够在多实例的情况下, 保障每个消息只被组内一个实例进行消费 。通过上面对消费组参数设置后的实验,我们可以观察到, 消费组并无法控制消息具体被哪个实例消费 。也就是说, 对于同一条消息,它多次到达之后可能是由不同的实例进行消费的 。但是对于 一些业务场景,就需要对于一些具有相同特征的消息每次都可以被同一个消费实例处理 ,比如: 一些用于监控服务 ,为了 统计某段时间内消息生产者发送的报告内容 ,监控服务需要在 自身内容聚合这些数据 ,那么消息生产者可以为消息增加一个固有的特征ID来进行分区, 使得拥有这些ID的消息每次都能被发送到一个特定的实例上实现累计统计的效果 ,否则这些数据就会分散到各个不同的节点导致监控结果不一致的情况。 分区概念的引入就是为了解决这样的问题 :当生产者将消息数据发送给多个消费者实例时, 保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理

Spring Cloud Stream 为分区提供了通用的抽象实现 ,用来在消息中间件的 上层实现分区 处理, 所以它对于消息中间件自身是否实现了消息分区并不关心 ,这使得Spring Cloud Stream为 不具备分区功能的消息中间件也增加了分区功能扩展

原文  https://juejin.im/post/5d9bff6cf265da5b5b6c5f96
正文到此结束
Loading...