这篇博客文章展示了如何配置Spring Kafka和Spring Boot以使用JSON发送消息并以多种格式接收它们:JSON,纯字符串或字节数组。基于此配置,您还可以将Kafka生成器从发送JSON切换到其他序列化方法。
此示例应用程序还演示了同一消费组中三个Kafka消费者的使用情况,因此消息在三者之间进行负载平衡。每个消费者实现不同的反序列化方法。
您可以了解一些Kafka概念,如Consumer Group和Topic分区。
多个消费者
要更好地理解配置,请查看下图。如您所见,我们创建了一个包含三个分区的Kafka主题。在消费者方面,只有一个应用程序,但它实现了具有相同group.id 属性的三个Kafka消费者。
当我们启动应用程序时,Kafka会为每个消费者分配一个不同的分区。消费者组将以负载平衡的方式接收消息。在这篇文章的后面,如果我们让它们具有不同的组标识符,你会看到有什么区别(如果你熟悉Kafka,你可能知道结果)。
示例用例
我们要构建的逻辑很简单。每次我们调用指定REST端点hello,应用程序将生成可配置数量的消息,并使用序列号作为Kafka密钥将它们发送到同一主题,等待消费所有消息后返回Hello Kafka!
设置Kafka和Spring Boot
首先,您需要有一个正在运行的Kafka集群才能连接。对于这个应用程序,我将在单个节点中使用docker-compose和Kafka。这显然远不是一个生产配置,但它足以满足这篇文章的目标。
以下是docker-compose.yml配置
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- <font>"2181:2181"</font><font>
kafka:
image: wurstmeister/kafka
ports:
- </font><font>"9092:9092"</font><font>
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
</font>
请注意,我将Kafka配置为 不 自动创建主题(最后一行配置)。我们将在Spring Boot应用程序创建我们的主题,因为我们想要传递一些自定义配置。如果你想玩玩这些Docker图像(例如使用多个节点),请查看wurstmeister/zookeeper图像 文档
要启动Kafka和Zookeeper容器,只要在上述配置目录下运行 docker-compose up
获取SpringBoot应用程序骨架的最简单方法是到 start.spring.io ,使用使用YAML进行配置配置application.yml:
spring:
kafka:
consumer:
group-id: tpd-loggers
auto-offset-reset: earliest
# change <b>this</b> property <b>if</b> you are using your own
# Kafka cluster or your Docker IP is different
bootstrap-servers: localhost:9092
tpd:
topic-name: advice-topic
messages-per-request: 10
第一部分属性是Spring Kafka配置:
第二部分是特定于应用程序的自定义配置。我们定义Kafka主题名称以及每次执行HTTP REST请求时要发送的消息数。
Message类
这是我们将用作Kafka消息的Java类。这里没有什么复杂的,只是@JsonProperty 在构造函数参数中带有注释的不可变类, 因此Jackson可以正确地反序列化它。
<b>import</b> com.fasterxml.jackson.annotation.JsonProperty;
<b>public</b> <b>class</b> PracticalAdvice {
<b>private</b> <b>final</b> String message;
<b>private</b> <b>final</b> <b>int</b> identifier;
<b>public</b> PracticalAdvice(@JsonProperty(<font>"message"</font><font>) <b>final</b> String message,
@JsonProperty(</font><font>"identifier"</font><font>) <b>final</b> <b>int</b> identifier) {
<b>this</b>.message = message;
<b>this</b>.identifier = identifier;
}
<b>public</b> String getMessage() {
<b>return</b> message;
}
<b>public</b> <b>int</b> getIdentifier() {
<b>return</b> identifier;
}
@Override
<b>public</b> String toString() {
<b>return</b> </font><font>"PracticalAdvice::toString() {"</font><font> +
</font><font>"message='"</font><font> + message + '/'' +
</font><font>", identifier="</font><font> + identifier +
'}';
}
}
</font>
Spring Boot中的Kafka Producer配置
为了简化应用程序,我们将在Spring Boot类中添加配置。最后,我们希望在此处包含生产者和消费者配置,并使用三种不同的变体进行反序列化。请记住,您可以在 GitHub存储库中 找到完整的源代码。
首先,让我们关注Producer配置:
@SpringBootApplication
<b>public</b> <b>class</b> KafkaExampleApplication {
<b>public</b> <b>static</b> <b>void</b> main(String[] args) {
SpringApplication.run(KafkaExampleApplication.<b>class</b>, args);
}
@Autowired
<b>private</b> KafkaProperties kafkaProperties;
@Value(<font>"${tpd.topic-name}"</font><font>)
<b>private</b> String topicName;
</font><font><i>// Producer configuration</i></font><font>
@Bean
<b>public</b> Map<String, Object> producerConfigs() {
Map<String, Object> props =
<b>new</b> HashMap<>(kafkaProperties.buildProducerProperties());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.<b>class</b>);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.<b>class</b>);
<b>return</b> props;
}
@Bean
<b>public</b> ProducerFactory<String, Object> producerFactory() {
<b>return</b> <b>new</b> DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
<b>public</b> KafkaTemplate<String, Object> kafkaTemplate() {
<b>return</b> <b>new</b> KafkaTemplate<>(producerFactory());
}
@Bean
<b>public</b> NewTopic adviceTopic() {
<b>return</b> <b>new</b> NewTopic(topicName, 3, (<b>short</b>) 1);
}
}
</font>
在此配置中,我们将设置应用程序的两个部分:
关于Java的Kafka Serializers和Deserializers
Strings 的核心Kafka库( javadoc )中提供了一些基本的Serializer ,所有类型的数组类和字节数组,以及Spring Kafka( javadoc )提供的JSON类。
最重要的是,您可以通过实现Serializer or ExtendedSerializer或其相应的反序列化版本来创建自己的序列化器和反序列化器。这为您提供了很大的灵活性,可以优化通过Kafka传输的数据量。正如您在这些接口中看到的那样,Kafka使用普通字节数组,因此,无论您使用何种复杂类型,都需要将其转换为byte[]。
知道这一点,你可能想知道为什么有人想要在Kafka上使用JSON。由于您将对象转换为JSON然后转换为字节数组,因此效率非常低。但是你必须考虑这样做有两个主要优点:
另一方面,如果您担心Kafka中的流量负载,存储或(反)序列化速度,您可能需要选择字节数组,甚至可以选择自己的串行器/解串器实现。
使用Spring Boot和Kafka发送消息
我们创建一个Rest Controller,并在KafkaTemplate 请求端点时通过注入来生成一些JSON消息。
这是控制器的第一个实现,仅包含产生消息的逻辑。
@RestController
<b>public</b> <b>class</b> HelloKafkaController {
<b>private</b> <b>static</b> <b>final</b> Logger logger =
LoggerFactory.getLogger(HelloKafkaController.<b>class</b>);
<b>private</b> <b>final</b> KafkaTemplate<String, Object> template;
<b>private</b> <b>final</b> String topicName;
<b>private</b> <b>final</b> <b>int</b> messagesPerRequest;
<b>private</b> CountDownLatch latch;
<b>public</b> HelloKafkaController(
<b>final</b> KafkaTemplate<String, Object> template,
@Value(<font>"${tpd.topic-name}"</font><font>) <b>final</b> String topicName,
@Value(</font><font>"${tpd.messages-per-request}"</font><font>) <b>final</b> <b>int</b> messagesPerRequest) {
<b>this</b>.template = template;
<b>this</b>.topicName = topicName;
<b>this</b>.messagesPerRequest = messagesPerRequest;
}
@GetMapping(</font><font>"/hello"</font><font>)
<b>public</b> String hello() throws Exception {
latch = <b>new</b> CountDownLatch(messagesPerRequest);
IntStream.range(0, messagesPerRequest)
.forEach(i -> <b>this</b>.template.send(topicName, String.valueOf(i),
<b>new</b> PracticalAdvice(</font><font>"A Practical Advice"</font><font>, i))
);
latch.await(60, TimeUnit.SECONDS);
logger.info(</font><font>"All messages received"</font><font>);
<b>return</b> </font><font>"Hello Kafka!"</font><font>;
}
}
</font>
在构造函数中,我们传递一些配置参数和我们自定义的KafkaTemplate,以发送String键和JSON值。然后,当API客户端请求/hello 端点时,我们发送10条消息(这是配置值),然后我们阻止线程最多60秒。锁存器解锁后,我们将消息返回Hello Kafka! 给客户端。
这整个锁定的想法不是在实际应用程序中看到的模式,但它对于这个例子来说是好的。这样,您可以检查收到的邮件数量。如果您愿意,可以在接收消息之前删除锁存器并返回“Hello Kafka!”消息。
Kafka消费者配置
正如前面在本文中所提到的,我们希望演示使用Spring Boot和Spring Kafka进行反序列化的不同方法,同时了解当多个消费者属于同一个消费者组时,多个消费者如何以负载均衡的方式工作。
@SpringBootApplication
<b>public</b> <b>class</b> KafkaExampleApplication {
<b>public</b> <b>static</b> <b>void</b> main(String[] args) {
SpringApplication.run(KafkaExampleApplication.<b>class</b>, args);
}
@Autowired
<b>private</b> KafkaProperties kafkaProperties;
@Value(<font>"${tpd.topic-name}"</font><font>)
<b>private</b> String topicName;
</font><font><i>// Producer configuration</i></font><font>
</font><font><i>// omitted...</i></font><font>
</font><font><i>// Consumer configuration</i></font><font>
</font><font><i>// If you only need one kind of deserialization, you only need to set the</i></font><font>
</font><font><i>// Consumer configuration properties. Uncomment this and remove all others below.</i></font><font>
</font><font><i>// @Bean</i></font><font>
</font><font><i>// public Map<String, Object> consumerConfigs() {</i></font><font>
</font><font><i>// Map<String, Object> props = new HashMap<>(</i></font><font>
</font><font><i>// kafkaProperties.buildConsumerProperties()</i></font><font>
</font><font><i>// );</i></font><font>
</font><font><i>// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,</i></font><font>
</font><font><i>// StringDeserializer.class);</i></font><font>
</font><font><i>// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,</i></font><font>
</font><font><i>// JsonDeserializer.class);</i></font><font>
</font><font><i>// props.put(ConsumerConfig.GROUP_ID_CONFIG,</i></font><font>
</font><font><i>// "tpd-loggers");</i></font><font>
</font><font><i>//</i></font><font>
</font><font><i>// return props;</i></font><font>
</font><font><i>// }</i></font><font>
@Bean
<b>public</b> ConsumerFactory<String, Object> consumerFactory() {
<b>final</b> JsonDeserializer<Object> jsonDeserializer = <b>new</b> JsonDeserializer<>();
jsonDeserializer.addTrustedPackages(</font><font>"*"</font><font>);
<b>return</b> <b>new</b> DefaultKafkaConsumerFactory<>(
kafkaProperties.buildConsumerProperties(), <b>new</b> StringDeserializer(), jsonDeserializer
);
}
@Bean
<b>public</b> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
<b>new</b> ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
<b>return</b> factory;
}
</font><font><i>// String Consumer Configuration</i></font><font>
@Bean
<b>public</b> ConsumerFactory<String, String> stringConsumerFactory() {
<b>return</b> <b>new</b> DefaultKafkaConsumerFactory<>(
kafkaProperties.buildConsumerProperties(), <b>new</b> StringDeserializer(), <b>new</b> StringDeserializer()
);
}
@Bean
<b>public</b> ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerStringContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
<b>new</b> ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(stringConsumerFactory());
<b>return</b> factory;
}
</font><font><i>// Byte Array Consumer Configuration</i></font><font>
@Bean
<b>public</b> ConsumerFactory<String, byte[]> byteArrayConsumerFactory() {
<b>return</b> <b>new</b> DefaultKafkaConsumerFactory<>(
kafkaProperties.buildConsumerProperties(), <b>new</b> StringDeserializer(), <b>new</b> ByteArrayDeserializer()
);
}
@Bean
<b>public</b> ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerByteArrayContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
<b>new</b> ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(byteArrayConsumerFactory());
<b>return</b> factory;
}
}
</font>
这种配置可能看起来很繁琐,但考虑到为了演示这三种类型的反序列化,我们重复了三次创建ConsumerFactory和KafkaListenerContainerFactory实例,以便我们可以在消费者中切换它们。
配置消费者的基本步骤是:
使用先前配置的Consumer Factory构造KafkaListenerContainerFactory(并发容器工厂)。同样,我们这样做三次,每个实例使用不同的一个。
使用Spring Boot和Kafka以JSON,String和byte []格式接收消息
现在是时候展示Kafka消费者的样子了。我们将使用@KafkaListener 注释,因为它简化了过程并负责对传递的Java类型进行反序列化。
@RestController
<b>public</b> <b>class</b> HelloKafkaController {
<b>private</b> <b>static</b> <b>final</b> Logger logger =
LoggerFactory.getLogger(HelloKafkaController.<b>class</b>);
<b>private</b> <b>final</b> KafkaTemplate<String, Object> template;
<b>private</b> <b>final</b> String topicName;
<b>private</b> <b>final</b> <b>int</b> messagesPerRequest;
<b>private</b> CountDownLatch latch;
<b>public</b> HelloKafkaController(
<b>final</b> KafkaTemplate<String, Object> template,
@Value(<font>"${tpd.topic-name}"</font><font>) <b>final</b> String topicName,
@Value(</font><font>"${tpd.messages-per-request}"</font><font>) <b>final</b> <b>int</b> messagesPerRequest) {
<b>this</b>.template = template;
<b>this</b>.topicName = topicName;
<b>this</b>.messagesPerRequest = messagesPerRequest;
}
@GetMapping(</font><font>"/hello"</font><font>)
<b>public</b> String hello() throws Exception {
latch = <b>new</b> CountDownLatch(messagesPerRequest);
IntStream.range(0, messagesPerRequest)
.forEach(i -> <b>this</b>.template.send(topicName, String.valueOf(i),
<b>new</b> PracticalAdvice(</font><font>"A Practical Advice"</font><font>, i))
);
latch.await(60, TimeUnit.SECONDS);
logger.info(</font><font>"All messages received"</font><font>);
<b>return</b> </font><font>"Hello Kafka!"</font><font>;
}
@KafkaListener(topics = </font><font>"advice-topic"</font><font>, clientIdPrefix = </font><font>"json"</font><font>,
containerFactory = </font><font>"kafkaListenerContainerFactory"</font><font>)
<b>public</b> <b>void</b> listenAsObject(ConsumerRecord<String, PracticalAdvice> cr,
@Payload PracticalAdvice payload) {
logger.info(</font><font>"Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}"</font><font>, cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
latch.countDown();
}
@KafkaListener(topics = </font><font>"advice-topic"</font><font>, clientIdPrefix = </font><font>"string"</font><font>,
containerFactory = </font><font>"kafkaListenerStringContainerFactory"</font><font>)
<b>public</b> <b>void</b> listenasString(ConsumerRecord<String, String> cr,
@Payload String payload) {
logger.info(</font><font>"Logger 2 [String] received key {}: Type [{}] | Payload: {} | Record: {}"</font><font>, cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
latch.countDown();
}
@KafkaListener(topics = </font><font>"advice-topic"</font><font>, clientIdPrefix = </font><font>"bytearray"</font><font>,
containerFactory = </font><font>"kafkaListenerByteArrayContainerFactory"</font><font>)
<b>public</b> <b>void</b> listenAsByteArray(ConsumerRecord<String, byte[]> cr,
@Payload byte[] payload) {
logger.info(</font><font>"Logger 3 [ByteArray] received key {}: Type [{}] | Payload: {} | Record: {}"</font><font>, cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
latch.countDown();
}
<b>private</b> <b>static</b> String typeIdHeader(Headers headers) {
<b>return</b> StreamSupport.stream(headers.spliterator(), false)
.filter(header -> header.key().equals(</font><font>"__TypeId__"</font><font>))
.findFirst().map(header -> <b>new</b> String(header.value())).orElse(</font><font>"N/A"</font><font>);
}
}
</font>
这里有三个消费者。首先,让我们描述@KafkaListener 注释的参数:
请注意,传递给所有消费者的第一个参数是相同的:一个 ConsumerRecord和@Payload,如果我们使用第一个,则第二个 是多余的。我们可以访问ConsumerRecord的方法value() 获得Payload,但我这里写在这里,让你看到它是多么简单直接通过反序列化得到的Payload。
Kafka中的TypeId标头
标头 __TypeId__默认情况下由Kafka库自动设置。我这里使用工具方法typeIdHeader 获得字符串,因为从ConsumerRecord的toString() 方法只能看到字节组输出。
TypeId标头可用于反序列化,因此您可以找到要将数据映射到的类型。但是JSON反序列化却不需要它,因为这个特殊的反序列化器是由Spring团队制作,并且它们从方法的参数中推断出类型。
运行
现在我们完成了Kafka生产者和消费者,我们可以运行Kafka和Spring Boot应用程序:
$ docker-compose up -d
Starting kafka-example_zookeeper_1 ... done
Starting kafka-example_kafka_1 ... done
$ mvn spring-boot:run
pring Boot应用程序启动,消费者在Kafka中注册,Kafka为它们分配了一个分区。我们使用三个分区配置主题,因此每个消费者都会分配其中一个分区。
[Consumer clientId=string-0, groupId=tpd-loggers] Successfully joined group with generation 28 <p>[Consumer clientId=string-0, groupId=tpd-loggers] Setting newly assigned partitions [advice-topic-2] <p>[Consumer clientId=bytearray-0, groupId=tpd-loggers] Successfully joined group with generation 28 <p>[Consumer clientId=bytearray-0, groupId=tpd-loggers] Setting newly assigned partitions [advice-topic-0] <p>[Consumer clientId=json-0, groupId=tpd-loggers] Successfully joined group with generation 28 <p>[Consumer clientId=json-0, groupId=tpd-loggers] Setting newly assigned partitions [advice-topic-1] partitions assigned: [advice-topic-1] partitions assigned: [advice-topic-2] partitions assigned: [advice-topic-0]
我们现在可以尝试对服务进行HTTP调用。您可以使用浏览器curl,例如:
curl localhost:8080/hello
注意日志中的输出。
以上源码: Kafka and Spring Boot Example .
解释
Kafka对消息的key进行哈希化(key是一个简单的字符串标识符),并根据key将消息放入不同的分区。每个使用者在其分配的分区中获取消息,并使用其反序列化器将其转换为Java对象。请记住,我们的生产者总是发送JSON值。
正如您在日志中看到的,每个反序列化器都设法完成其任务,因此String消费者打印原始JSON消息,字节数组消费者显示JSON字符串的字节表示,JSON反序列化器使用Java类型映射器进行转换到原来的类PracticalAdvice。您可以查看记录的ConsumerRecord,您将看到标题,指定的分区,偏移量等。
这就是你如何使用Spring Boot和Kafka发送和接收JSON消息。我希望您发现本指南很有用,下面您有一些代码变体,以便您可以更多地了解Kafka的工作原理。
多次请求/hello
发出一些请求,然后查看消息如何跨分区分布。具有相同密钥的Kafka消息始终放在相同的分区中。当您希望确保指定用户、进程或正在处理的任何逻辑的所有消息都由同一消费者以与生成时相同的顺序接收时,此功能非常有用(在事件溯源EventSourcing时实现事件顺序,从而实现事务一致性很需要),这里就不考虑负载平衡了。
减少分区数量
首先,确保重新启动Kafka,这样您就可以放弃以前的配置。
然后,在应用程序中重新定义主题,使其只有2个分区:
@Bean
<b>public</b> NewTopic adviceTopic() {
<b>return</b> <b>new</b> NewTopic(topicName, 2, (<b>short</b>) 1);
}
现在,再次运行应用程序并向/hello 端点发出请求。
结果:其中一个消费者没有收到任何消息。这是预期的行为,因为同一消费者组中没有可用的分区(我们只设置了2个分区)。
更改一个消费者的组标识符
保留上一个案例的更改,该主题现在只有2个分区。我们现在正在改变我们的一个消费者的群组ID。
@KafkaListener(topics = <font>"advice-topic"</font><font>, clientIdPrefix = </font><font>"bytearray"</font><font>,
containerFactory = </font><font>"kafkaListenerByteArrayContainerFactory"</font><font>,
groupId = </font><font>"tpd-loggers-2"</font><font>)
<b>public</b> <b>void</b> listenAsByteArray(ConsumerRecord<String, byte[]> cr,
@Payload byte[] payload) {
logger.info(</font><font>"Logger 3 [ByteArray] received a payload with size {}"</font><font>, payload.length);
latch.countDown();
</font>
请注意,我们还更改了记录的消息。现在,这个消费者负责打印payload有效载荷的大小。此外,我们需要更改CountDownLatch 它,因此它需要两倍的消息数。
latch = new CountDownLatch(messagesPerRequest * 2);
为什么?这一次,让我们解释在运行应用程序之前会发生什么。正如我在本文开头所描述的那样,当消费者属于同一个消费者群体时,他们(在概念上)正在处理同一个任务。我们正在实现一种负载均衡机制,其中并发工作程序从会不同分区获取消息,处理的消息是彼此隔离的。
在这个例子中,我还改变了最后一个消费者的“任务”,以便更好地理解这一点:它打印的是不同的东西。由于我们更改了组ID,因此该消费者将独立工作,Kafka将为其分配两个分区。字节数组消费者将接收所有消息,与其他两个消息分开工作。