转载

Spring Boot和Apache Kafka结合实现错误处理,消息转换和事务支持?

Spring为Kafka带来了熟悉的Spring编程模型。它提供了KafkaTemplate用于发布记录和用于异步执行POJO侦听器的侦听器容器。Spring Boot自动配置连接了大部分基础架构,因此您可以专注于业务逻辑。

错误恢复

考虑这个简单的POJO侦听器方法:

@KafkaListener(id = <font>"fooGroup"</font><font>, topics = </font><font>"topic1"</font><font>)
<b>public</b> <b>void</b> listen(String in) {
  logger.info(</font><font>"Received: "</font><font> + in);
  <b>if</b> (in.startsWith(</font><font>"foo"</font><font>)) {
    <b>throw</b> <b>new</b> RuntimeException(</font><font>"failed"</font><font>);
  }
}
</font>

默认情况下,只记录失败的记录,然后我们继续下一个记录。但是,我们可以在侦听器容器中配置错误处理程序以执行其他操作。为此,我们使用我们自己的方法覆盖Spring Boot的自动配置容器工厂:

@Bean
<b>public</b> ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
    ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
    ConsumerFactory<Object, Object> kafkaConsumerFactory) {
  ConcurrentKafkaListenerContainerFactory<Object, Object> factory = <b>new</b> ConcurrentKafkaListenerContainerFactory<>();
  configurer.configure(factory, kafkaConsumerFactory);
  factory.setErrorHandler(<b>new</b> SeekToCurrentErrorHandler()); <font><i>// <<<<<<</i></font><font>
  <b>return</b> factory;
}
</font>

请注意,我们仍然可以利用大部分自动配置。

SeekToCurrentErrorHandler丢弃来自poll()剩下的记录,并执行seek操作实现消费者操作偏移offset复位,使丢弃记录在下一次轮询再取出。默认情况下,错误处理程序会跟踪失败的记录,在10次传递尝试后放弃并记录失败的记录。但是,我们也可以将失败的消息发送到另一个主题。我们称之为死信主题。

下面是合在一起代码:

@Bean
<b>public</b> ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
    ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
    ConsumerFactory<Object, Object> kafkaConsumerFactory,
    KafkaTemplate<Object, Object> template) {
  ConcurrentKafkaListenerContainerFactory<Object, Object> factory = <b>new</b> ConcurrentKafkaListenerContainerFactory<>();
  configurer.configure(factory, kafkaConsumerFactory);
  factory.setErrorHandler(<b>new</b> SeekToCurrentErrorHandler(
      <b>new</b> DeadLetterPublishingRecoverer(template), 3));
  <b>return</b> factory;
}

@KafkaListener(id = <font>"fooGroup"</font><font>, topics = </font><font>"topic1"</font><font>)
<b>public</b> <b>void</b> listen(String in) {
  logger.info(</font><font>"Received: "</font><font> + in);
  <b>if</b> (in.startsWith(</font><font>"foo"</font><font>)) {
    <b>throw</b> <b>new</b> RuntimeException(</font><font>"failed"</font><font>);
  }
}

@KafkaListener(id = </font><font>"dltGroup"</font><font>, topics = </font><font>"topic1.DLT"</font><font>)
<b>public</b> <b>void</b> dltListen(String in) {
  logger.info(</font><font>"Received from DLT: "</font><font> + in);
}
</font>

反序列化错误

但是,在Spring获得记录之前发生的反序列化异常呢?使用ErrorHandlingDeserializer。此反序列化器包装委托反序列化器并捕获任何异常。然后将它们转发到侦听器容器,该容器将它们直接发送到错误处理程序。该异常包含源数据,因此您可以诊断问题。

领域对象并推断类型

请考虑以下示例:

@Bean
<b>public</b> RecordMessageConverter converter() {
  <b>return</b> <b>new</b> StringJsonMessageConverter();
}

@KafkaListener(id = <font>"fooGroup"</font><font>, topics = </font><font>"topic1"</font><font>)
<b>public</b> <b>void</b> listen(Foo2 foo) {
  logger.info(</font><font>"Received: "</font><font> + foo);
  <b>if</b> (foo.getFoo().startsWith(</font><font>"fail"</font><font>)) {
    <b>throw</b> <b>new</b> RuntimeException(</font><font>"failed"</font><font>);
  }
}

@KafkaListener(id = </font><font>"dltGroup"</font><font>, topics = </font><font>"topic1.DLT"</font><font>)
<b>public</b> <b>void</b> dltListen(Foo2 in) {
  logger.info(</font><font>"Received from DLT: "</font><font> + in);
}
</font>

请注意,我们现在正在使用类型的对象Foo2。消息转换器bean推断要转换为方法签名中的参数类型的类型。转换器自动“信任”该类型。Spring Boot自动将转换器配置到侦听器容器中。

在生产者方面,发送的对象可以是不同的类(只要它是类型兼容的):

@RestController
<b>public</b> <b>class</b> Controller {

    @Autowired
    <b>private</b> KafkaTemplate<Object, Object> template;

    @PostMapping(path = <font>"/send/foo/{what}"</font><font>)
    <b>public</b> <b>void</b> sendFoo(@PathVariable String what) {
        <b>this</b>.template.send(</font><font>"topic1"</font><font>, <b>new</b> Foo1(what));
    }

}
</font>

配置:

spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

$ curl -X POST http:<font><i>//localhost:8080/send/foo/fail</i></font><font>
</font>

在这里,我们使用StringDeserializer,并在消费者端使用一个“智能”消息转换器。

多方法监听器

我们还可以使用单个侦听器容器并根据类型路由到特定方法。由于有多个方法,类型需要选择要调用的方法,因此这里我们就无法推断类型了。

相反,我们依赖于记录头中传递的类型信息来从源类型映射到目标类型。此外,由于我们不推断类型,我们需要配置消息转换器以“信任”包的映射类型。

在这种情况下,我们将在两侧使用消息转换器( StringSerializer和StringDeserializer 一起使用)。以下消费者侧转换器示例将它们放在一起:

@Bean
<b>public</b> RecordMessageConverter converter() {
  StringJsonMessageConverter converter = <b>new</b> StringJsonMessageConverter();
  DefaultJackson2JavaTypeMapper typeMapper = <b>new</b> DefaultJackson2JavaTypeMapper();
  typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID);
  typeMapper.addTrustedPackages(<font>"com.common"</font><font>);
  Map<String, Class<?>> mappings = <b>new</b> HashMap<>();
  mappings.put(</font><font>"foo"</font><font>, Foo2.<b>class</b>);
  mappings.put(</font><font>"bar"</font><font>, Bar2.<b>class</b>);
  typeMapper.setIdClassMapping(mappings);
  converter.setTypeMapper(typeMapper);
  <b>return</b> converter;
}
</font>

在这里,我们将“foo”映射到类Foo2,将“bar” 映射到类Bar2。请注意,我们必须告诉它使用TYPE_ID标头来确定转换的类型。同样,Spring Boot会自动将消息转换器配置到容器中。下面是application.yml文件片段中的生产者端类型映射; 格式是以冒号分隔的token:FQCN列表:

spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        spring.json.type.mapping: foo:com.common.Foo1,bar:com.common.Bar1

此配置将类映射Foo1到“foo”,将类映射Bar1到“bar”。

监听器:

@Component
@KafkaListener(id = <font>"multiGroup"</font><font>, topics = { </font><font>"foos"</font><font>, </font><font>"bars"</font><font> })
<b>public</b> <b>class</b> MultiMethods {

    @KafkaHandler
    <b>public</b> <b>void</b> foo(Foo1 foo) {
        System.out.println(</font><font>"Received: "</font><font> + foo);
    }

    @KafkaHandler
    <b>public</b> <b>void</b> bar(Bar bar) {
        System.out.println(</font><font>"Received: "</font><font> + bar);
    }

    @KafkaHandler(isDefault = <b>true</b>)
    <b>public</b> <b>void</b> unknown(Object object) {
        System.out.println(</font><font>"Received unknown: "</font><font> + object);
    }

}
</font>

生产者:

@RestController
<b>public</b> <b>class</b> Controller {

    @Autowired
    <b>private</b> KafkaTemplate<Object, Object> template;

    @PostMapping(path = <font>"/send/foo/{what}"</font><font>)
    <b>public</b> <b>void</b> sendFoo(@PathVariable String what) {
        <b>this</b>.template.send(<b>new</b> GenericMessage<>(<b>new</b> Foo1(what),
                Collections.singletonMap(KafkaHeaders.TOPIC, </font><font>"foos"</font><font>)));
    }

    @PostMapping(path = </font><font>"/send/bar/{what}"</font><font>)
    <b>public</b> <b>void</b> sendBar(@PathVariable String what) {
        <b>this</b>.template.send(<b>new</b> GenericMessage<>(<b>new</b> Bar(what),
                Collections.singletonMap(KafkaHeaders.TOPIC, </font><font>"bars"</font><font>)));
    }

    @PostMapping(path = </font><font>"/send/unknown/{what}"</font><font>)
    <b>public</b> <b>void</b> sendUnknown(@PathVariable String what) {
        <b>this</b>.template.send(<b>new</b> GenericMessage<>(what,
                Collections.singletonMap(KafkaHeaders.TOPIC, </font><font>"bars"</font><font>)));
    }

}
</font>

事务

通过在application.yml文件中设置transactional-id-prefix来启用事务:

spring:
    kafka:
      producer:
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
        transaction-id-prefix: tx.
      consumer:
        properties:
          isolation.level: read_committed

当使用spring-kafka 1.3.x或更高版本以及支持事务的kafka-clients版本(0.11或更高版本)时,方法中KafkaTemplate执行的任何操作@KafkaListener都将参与事务,并且侦听器容器将在提交之前将偏移发送到事务它。认识到我们还为消费者设置了隔离级别,使其无法查看未提交的记录。以下示例暂停侦听器,以便我们可以看到此效果:

@KafkaListener(id = <font>"fooGroup2"</font><font>, topics = </font><font>"topic2"</font><font>)
<b>public</b> <b>void</b> listen(List foos) throws IOException {
  logger.info(</font><font>"Received: "</font><font> + foos);
  foos.forEach(f -> kafkaTemplate.send(</font><font>"topic3"</font><font>, f.getFoo().toUpperCase()));
  logger.info(</font><font>"Messages sent, hit enter to commit tx"</font><font>);
  System.in.read();
}

@KafkaListener(id = </font><font>"fooGroup3"</font><font>, topics = </font><font>"topic3"</font><font>)
<b>public</b> <b>void</b> listen(String in) {
  logger.info(</font><font>"Received: "</font><font> + in);
}
</font>

此示例的生产者在单个事务中发送多个记录:

@PostMapping(path = <font>"/send/foos/{what}"</font><font>)
<b>public</b> <b>void</b> sendFoo(@PathVariable String what) {
  <b>this</b>.template.executeInTransaction(kafkaTemplate -> {
    StringUtils.commaDelimitedListToSet(what).stream()
      .map(s -> <b>new</b> Foo1(s))
      .forEach(foo -> kafkaTemplate.send(</font><font>"topic2"</font><font>, foo));
    <b>return</b> <b>null</b>;
  });
}

curl -X POST http:</font><font><i>//localhost:8080/send/foos/a,b,c,d,e</i></font><font>

Received: [Foo2 [foo=a], Foo2 [foo=b], Foo2 [foo=c], Foo2 [foo=d], Foo2 [foo=e]]
Messages sent, hit Enter to commit tx

Received: [A, B, C, D, E]
</font>

更多Spring Cloud Stream对Kafka的支持点击#kafka标签进入

原文  https://www.jdon.com/51899
正文到此结束
Loading...