前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >「首席架构师看Event Hub」Kafka的Spring 深入挖掘 -第1部分

「首席架构师看Event Hub」Kafka的Spring 深入挖掘 -第1部分

作者头像
首席架构师智库
发布2019-10-19 10:53:53
1.4K0
发布2019-10-19 10:53:53
举报
文章被收录于专栏:超级架构师超级架构师

接下来是《如何在您的Spring启动应用程序中使用Apache Kafka》https://www.confluent.io/blog/apache-kafka-spring-boot-application ,这展示了如何开始使用Spring启动和Apache Kafka®,这里我们将更深入地挖掘Apache Kafka项目的Spring提供的一些附加功能。

Apache Kafka的Spring为Kafka带来了熟悉的Spring编程模型。它提供了用于发布记录的KafkaTemplate和用于异步执行POJO侦听器的侦听器容器。Spring引导自动配置连接了许多基础设施,因此您可以将精力集中在业务逻辑上。

错误恢复

考虑一下这个简单的POJO监听器方法:

@KafkaListener(id = "fooGroup", topics = "topic1") public void listen(String in) { logger.info("Received: " + in); if (in.startsWith("foo")) { throw new RuntimeException("failed"); } }

默认情况下,失败的记录会被简单地记录下来,然后我们继续下一个。但是,我们可以在侦听器容器中配置一个错误处理程序来执行一些其他操作。为此,我们用我们自己的来覆盖Spring Boot的自动配置容器工厂:

@Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); factory.setErrorHandler(new SeekToCurrentErrorHandler()); // <<<<<< return factory; }

注意,我们仍然可以利用大部分的自动配置。

SeekToCurrentErrorHandler丢弃轮询()中的剩余记录,并在使用者上执行查找操作来重置偏移量,以便在下一次轮询时再次获取被丢弃的记录。默认情况下,错误处理程序跟踪失败的记录,在10次提交尝试后放弃,并记录失败的记录。但是,我们也可以将失败的消息发送到另一个主题。我们称这是一个毫无意义的话题。

下面的例子把这一切放在一起:

@Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> template) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); factory.setErrorHandler(new SeekToCurrentErrorHandler( new DeadLetterPublishingRecoverer(template), 3)); return factory; } @KafkaListener(id = "fooGroup", topics = "topic1") public void listen(String in) { logger.info("Received: " + in); if (in.startsWith("foo")) { throw new RuntimeException("failed"); } } @KafkaListener(id = "dltGroup", topics = "topic1.DLT") public void dltListen(String in) { logger.info("Received from DLT: " + in); }

反序列化错误

但是,在Spring获得记录之前发生的反序列化异常又如何呢?进入ErrorHandlingDeserializer。此反序列化器包装委托反序列化器并捕获任何异常。然后将它们转发给侦听器容器,后者将它们直接发送给错误处理程序。异常包含源数据,因此可以诊断问题。

域对象并推断类型

考虑下面的例子:

@Bean public RecordMessageConverter converter() { return new StringJsonMessageConverter(); } @KafkaListener(id = "fooGroup", topics = "topic1") public void listen(Foo2 foo) { logger.info("Received: " + foo); if (foo.getFoo().startsWith("fail")) { throw new RuntimeException("failed"); } } @KafkaListener(id = "dltGroup", topics = "topic1.DLT") public void dltListen(Foo2 in) { logger.info("Received from DLT: " + in); }

注意,我们现在正在使用类型为Foo2的对象。消息转换器bean推断要转换为方法签名中的参数类型的类型。

转换器自动“信任”类型。Spring Boot自动将转换器配置到侦听器容器中。

在生产者方面,发送的对象可以是一个不同的类(只要它的类型兼容):

@RestController public class Controller { @Autowired private KafkaTemplate<Object, Object> template; @PostMapping(path = "/send/foo/{what}") public void sendFoo(@PathVariable String what) { this.template.send("topic1", new Foo1(what)); } }

和:

spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer $ curl -X POST http://localhost:8080/send/foo/fail

这里,我们在消费者端使用StringDeserializer和“智能”消息转换器。

多种监听器

我们还可以使用单个侦听器容器,并根据类型路由到特定的方法。这次我们不能推断类型,因为类型是用来选择要调用的方法的。

相反,我们依赖于在记录头中传递的类型信息来将源类型映射到目标类型。此外,由于我们没有推断类型,所以需要将消息转换器配置为“信任”映射类型的包。

在本例中,我们将在两端使用消息转换器(以及StringSerializer和StringDeserializer)。下面是消费者端转换器的例子:

@Bean public RecordMessageConverter converter() { StringJsonMessageConverter converter = new StringJsonMessageConverter(); DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID); typeMapper.addTrustedPackages("com.common"); Map<String, Class<?>> mappings = new HashMap<>(); mappings.put("foo", Foo2.class); mappings.put("bar", Bar2.class); typeMapper.setIdClassMapping(mappings); converter.setTypeMapper(typeMapper); return converter; }

在这里,我们从“foo”映射到类Foo2,从“bar”映射到类Bar2。注意,我们必须告诉它使用TYPE_ID头来确定转换的类型。同样,Spring Boot会自动将消息转换器配置到容器中。下面是应用程序片段中的生产端类型映射。yml文件;格式是一个逗号分隔的令牌列表:FQCN:

spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: spring.json.type.mapping: foo:com.common.Foo1,bar:com.common.Bar1

这个配置将类Foo1映射到“foo”,将类Bar1映射到“bar”。

监听器:

@Component @KafkaListener(id = "multiGroup", topics = { "foos", "bars" }) public class MultiMethods { @KafkaHandler public void foo(Foo1 foo) { System.out.println("Received: " + foo); } @KafkaHandler public void bar(Bar bar) { System.out.println("Received: " + bar); } @KafkaHandler(isDefault = true) public void unknown(Object object) { System.out.println("Received unknown: " + object); } }

生产者:

@RestController public class Controller { @Autowired private KafkaTemplate<Object, Object> template; @PostMapping(path = "/send/foo/{what}") public void sendFoo(@PathVariable String what) { this.template.send(new GenericMessage<>(new Foo1(what), Collections.singletonMap(KafkaHeaders.TOPIC, "foos"))); } @PostMapping(path = "/send/bar/{what}") public void sendBar(@PathVariable String what) { this.template.send(new GenericMessage<>(new Bar(what), Collections.singletonMap(KafkaHeaders.TOPIC, "bars"))); } @PostMapping(path = "/send/unknown/{what}") public void sendUnknown(@PathVariable String what) { this.template.send(new GenericMessage<>(what, Collections.singletonMap(KafkaHeaders.TOPIC, "bars"))); } }

事务

通过在应用程序中设置transactional-id前缀来启用事务。yml文件:

spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer transaction-id-prefix: tx. consumer: properties: isolation.level: read_committed

当使用spring-kafka 1.3时。x或更高版本和支持事务的kafka-clients版本(0.11或更高版本),在@KafkaListener方法中执行的任何KafkaTemplate操作都将参与事务,而侦听器容器将在提交事务之前向事务发送偏移量。请注意,我们还为使用者设置了隔离级别,使其无法看到未提交的记录。下面的例子暂停监听器,这样我们可以看到效果:

@KafkaListener(id = "fooGroup2", topics = "topic2") public void listen(List foos) throws IOException { logger.info("Received: " + foos); foos.forEach(f -> kafkaTemplate.send("topic3", f.getFoo().toUpperCase())); logger.info("Messages sent, hit enter to commit tx"); System.in.read(); } @KafkaListener(id = "fooGroup3", topics = "topic3") public void listen(String in) { logger.info("Received: " + in); }

本例中的生产者在一个事务中发送多条记录:

@PostMapping(path = "/send/foos/{what}") public void sendFoo(@PathVariable String what) { this.template.executeInTransaction(kafkaTemplate -> { StringUtils.commaDelimitedListToSet(what).stream() .map(s -> new Foo1(s)) .forEach(foo -> kafkaTemplate.send("topic2", foo)); return null; }); } curl -X POST http://localhost:8080/send/foos/a,b,c,d,e Received: [Foo2 [foo=a], Foo2 [foo=b], Foo2 [foo=c], Foo2 [foo=d], Foo2 [foo=e]] Messages sent, hit Enter to commit tx Received: [A, B, C, D, E]

结论

在Apache Kafka中使用Spring可以消除很多样板代码。它还增加了诸如错误处理、重试和记录筛选等功能——而我们只是触及了表面。

原文:https://www.confluent.io/blog/spring-for-apache-kafka-deep-dive-part-1-error-handling-message-conversion-transaction-support

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 首席架构师智库 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 错误恢复
  • 反序列化错误
  • 域对象并推断类型
  • 多种监听器
  • 事务
  • 结论
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档