Spring Cloud Stream如何深度支持Apache Kafka?

Spring Cloud Stream提供了一种编程模型,可以立即连接到Apache Kafka。应用程序需要在其类路径中包含Kafka绑定器并添加一个名为的注释@EnableBinding,该注释将Kafka主题绑定到其输入或输出(或两者)。

Spring Cloud Stream提供了三个方便的接口来绑定@EnableBinding:( Source单输出),Sink(单输入)和Processor(单输入和输出)。它可以扩展到具有多个输入和输出的自定义接口。

以下代码片段显示了Spring Cloud Stream的基本编程模型:

@SpringBootApplication
@EnableBinding(Processor.<b>class</b>)
<b>public</b> <b>class</b> UppercaseProcessor {

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  <b>public</b> String process(String s) {
     <b>return</b> s.toUpperCase();
  }
}

请注意该方法已注释@StreamListener,Spring Cloud Stream提供该方法以接收来自Kafka主题的消息。同样的方法也用注释SendTo,这是一个方便的注释,用于将消息发送到输出目的地。这是一个Spring Cloud Stream Processor应用程序,它使用来自输入的消息并向输出生成消息。

前面的代码中没有提到Kafka主题Topic。此时可能出现的一个自然问题是,“此应用程序如何与Kafka通信?”答案是:使用 Spring Boot
支持的众多配置选项之一配置入站和出站主题。在这种情况下,我们使用名为YAML的配置文件application.yml,默认情况下会搜索该文件。以下是输入和输出目标的配置:

spring.cloud.stream.bindings:
  input:
    destination: topic1
  output:
    destination: topic2

Spring Cloud Stream将输入input映射到输出topic1和输出topic2。这是一组非常小的配置,但有更多选项可用于进一步自定义应用程序。默认情况下,主题是使用单个分区创建的,但可以由应用程序覆盖。有关更多信息,请参阅这些 文档

最重要的是,开发人员可以专注于编写核心业务逻辑,并让Spring Cloud Stream和Spring Boot处理基础架构问题(例如连接到Kafka,配置和调整应用程序等)。

以下示例显示了另一个简单的应用程序(消费者):

@SpringBootApplication
@EnableBinding(Sink.<b>class</b>)
<b>public</b> <b>class</b> LoggingConsumerApplication {

  @StreamListener(Sink.INPUT)
  <b>public</b> <b>void</b> handle(Person person) {
     System.out.println(<font>"Received: "</font><font> + person);
  }

  <b>public</b> <b>static</b> <b>class</b> Person {
     <b>private</b> String name;
     <b>public</b> String getName() {
        <b>return</b> name;
     }
     <b>public</b> <b>void</b> setName(String name) {
        <b>this</b>.name = name;
     }
     <b>public</b> String toString() {
        <b>return</b> <b>this</b>.name;
     }
  }
}
</font>

请注意,@EnableBinding带有一个Sink.class,表示这是一个消费者。与之前的应用程序的一个主要区别在于,注释的方法@StreamListener是将称为Person的POJO作为其参数而不是字符串。将来自Kafka主题的消息转换为此POJO!Spring Cloud Stream提供 自动内容类型转换
。默认情况下,它application/JSON用作内容类型,但也支持其他内容类型。您可以使用该属性提供内容类型spring.cloud.stream.bindings.input.contentType,然后将其设置为适当的内容类型,例如application/Avro。

Spring Cloud Stream根据此配置选择适当的消息转换器。如果应用程序想要使用Kafka提供的本机序列化和反序列化而不是使用Spring Cloud Stream提供的消息转换器,则可以设置以下属性。

对于序列化:

spring.cloud.stream.bindings.output.useNativeEncoding=<b>true</b>

对于反序列化:

spring.cloud.stream.bindings.input.useNativeDecoding=<b>true</b>

自动配置主题

Apache Kafka绑定器提供了一个配置器provisioner ,用于在启动时配置主题。如果在代理上启用了主题创建,则Spring Cloud Stream应用程序可以作为应用程序启动的一部分来创建和配置Kafka主题。

例如,可以向分配器提供分区和其他主题级配置。这些自定义可以在 binder
完成,适用于应用程序中使用的所有主题,或者适用于单个生产者和消费者级别。这在应用程序的开发和测试期间尤其方便。有关如何为多个分区配置主题的各种 示例

支持消费组和分区

消费者组和分区等众所周知的属性可通过Spring Cloud Stream进行配置。可以通过属性设置消费者组:

spring.cloud.stream.bindings.input.group=group-name

如前所述,在内部,该小组将被翻译成Kafka的消费者群体。在编写生产者应用程序时,Spring Cloud Stream提供了将数据发送到特定分区的选项。在内部,框架再次将这些职责委托给Kafka。

如果禁用消费者组的自动重新平衡,则可以限制特定应用程序实例使用来自某组分区的消息,这是一个需要覆盖的简单配置属性。有关详细信息,请参阅这些 配置选项

绑定可视化和控制

使用Spring Boot的执行器机制,我们现在可以 控制
Spring Cloud Stream中的 各个绑定

在运行时,可以使用 执行器端点
停止,暂停,恢复应用程序,这是Spring Boot的机制,用于在将应用程序推送到生产环境时监视和管理应用程序。此功能使用户可以对应用程序处理Kafka数据的方式进行更多控制。如果应用程序暂停绑定,则处理来自该特定主题的记录将被暂停,直到恢复为止。

Spring Cloud Stream还与 Micrometer
集成,可实现 更丰富的指标
,排放混乱率并提供其他与监控相关的功能。这些可以与许多其他 监控系统
进一步集成。Kafka活页夹提供了扩展的 指标功能
,可以提供有关主题的消费者滞后的其他见解。

Spring Boot 通过特殊的 健康端点
提供应用程序运行状况检查。Kafka绑定器提供了一个特殊的健康指示器实现,它考虑了与代理的连接,并检查所有分区是否健康。如果在没有领导者的情况下找到任何分区,或者无法连接代理,则运行状况检查会相应地报告状态。

下面见Spring Cloud Stream对Kafka Streams的深度支持,可点击#Kafka标签进入查看

原文 

https://www.jdon.com/51897

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » Spring Cloud Stream如何深度支持Apache Kafka?

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址