前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >微服务架构之Spring Boot(五十七)

微服务架构之Spring Boot(五十七)

作者头像
用户1289394
发布2022-05-23 15:38:25
8750
发布2022-05-23 15:38:25
举报
文章被收录于专栏:Java学习网Java学习网

33.3 Apache Kafka支持

通过提供 spring-kafka 项目的自动配置来支持Apache Kafka。

Kafka配置由 spring.kafka.* 中的外部配置属性控制。例如,您可以在 application.properties 中声明以下部分:

spring.kafka.bootstrap-servers=localhost:9092

spring.kafka.consumer.group-id=myGroup

要在启动时创建主题,请添加bean类型 NewTopic 。如果主题已存在,则忽略bean。

有关 KafkaProperties 更多支持选项,请参阅

33.3.1发送消息

Spring的 KafkaTemplate 是自动配置的,您可以直接在自己的beans中自动装配它,如下例所示:

@Component

public class MyBean {

private final KafkaTemplate kafkaTemplate;

@Autowired

public MyBean(KafkaTemplate kafkaTemplate) {

this.kafkaTemplate = kafkaTemplate;

}

// ...

}

如果定义了属性 spring.kafka.producer.transaction-id-prefix ,则会自动配置 KafkaTransactionManager 。此外,如果

定义了 RecordMessageConverter bean,它将自动与自动配置的 KafkaTemplate 相关联。

33.3.2接收消息

当存在Apache Kafka基础结构时,可以使用 @KafkaListener 注释任何bean以创建侦听器端点。如果未定

义 KafkaListenerContainerFactory ,则会使用 spring.kafka.listener.* 中定义的键自动配置默认值。

以下组件在 someTopic 主题上创建一个侦听器端点:

@Component

public class MyBean {

@KafkaListener(topics = "someTopic")

public void processMessage(String content) {

// ...

}

}

如果定义了 KafkaTransactionManager bean,它将自动关联到容器工厂。同样,如果定义了 RecordMessageConverter , ErrorHandler

或 AfterRollbackProcessor bean,它将自动与默认工厂相关联。

自定义 ChainedKafkaTransactionManager 必须标记为 @Primary ,因为它通常会引用自动配置

的 KafkaTransactionManager bean。

33.3.3卡夫卡流

Apache Kafka的Spring提供了一个工厂bean来创建一个 StreamsBuilder 对象并管理其流的生命周期。Spring Boot只要 kafka-streams 在

类路径上,并且通过 @EnableKafkaStreams 注释启用Kafka Streams,就会自动配置所需的 KafkaStreamsConfiguration bean。

启用Kafka Streams意味着必须设置应用程序ID和引导程序服务器。可以使用 spring.kafka.streams.application-id 配置前者,如果未设

置,则默认为 spring.application.name 。后者可以全局设置或专门为流而重写。

使用专用属性可以使用其他几个属性; 可以使用 spring.kafka.streams.properties 命名空间设置其他任意Kafka属性。有关更多信息,另请

参见第33.3.4节“其他Kafka属性”。

要使用工厂bean,只需将 StreamsBuilder 连接到 @Bean ,如下例所示:

@Configuration

@EnableKafkaStreams

static class KafkaStreamsExampleConfiguration {

@Bean

public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {

KStream<Integer, String> stream = streamsBuilder.stream("ks1In");

stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",

Produced.with(Serdes.Integer(), new JsonSerde<>()));

return stream;

}

}

默认情况下,由其创建的 StreamBuilder 对象管理的流将自动启动。您可以使用 spring.kafka.streams.auto-startup 属性自定义此行为。

33.3.4附加Kafka属性

自动配置支持的属性显示在 附录A,常见应用程序属性中。请注意,在大多数情况下,这些属性(连字符或camelCase)直接映射到Apache

Kafka点状属性。有关详细信息,请参阅Apache Kafka文档。

这些属性中的前几个适用于所有组件(生产者,使用者,管理员和流),但如果您希望使用不同的值,则可以在组件级别指定。Apache Kafka

指定重要性为HIGH,MEDIUM或LOW的属性。Spring Boot auto-configuration支持所有HIGH重要性属性,一些选定的MEDIUM和LOW属

性,以及任何没有默认值的属性。

只有Kafka支持的属性的一部分可以通过 KafkaProperties 类直接获得。如果您希望使用不直接支持的其他属性配置生产者或使用者,请使用以

下属性:

spring.kafka.properties.prop.one=first

spring.kafka.admin.properties.prop.two=second

spring.kafka.consumer.properties.prop.three=third

spring.kafka.producer.properties.prop.four=fourth

spring.kafka.streams.properties.prop.five=fifth

这将常见的 prop.one Kafka属性设置为 first (适用于生产者,消费者和管理员),将 prop.two admin属性设置为 second ,

将 prop.three 使用者属性设置为 third , prop.four 生产者属性为 fourth , prop.five 流属性为 fifth 。

您还可以按如下方式配置Spring Kafka JsonDeserializer :

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice

spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

同样,您可以禁用在标头中发送类型信息的 JsonSerializer 默认行为:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.producer.properties.spring.json.add.type.headers=false

重要

以这种方式设置的属性会覆盖Spring Boot明确支持的任何配置项。

34.使用 RestTemplate 调用REST服务

如果需要从应用程序调用远程REST服务,可以使用Spring Framework的 RestTemplate 类。由于 RestTemplate 实例在使用之前通常需要进行

自定义,因此Spring Boot不提供任何单个自动配置 RestTemplate bean。但是,它会自动配置 RestTemplateBuilder ,可在需要时用于创

建 RestTemplate 实例。自动配置的 RestTemplateBuilder 确保将合理的 HttpMessageConverters 应用于 RestTemplate 实例。

以下代码显示了一个典型示例:

@Service

public class MyService {

private final RestTemplate restTemplate;

public MyService(RestTemplateBuilder restTemplateBuilder) {

this.restTemplate = restTemplateBuilder.build();

}

public Details someRestCall(String name) {

return this.restTemplate.getForObject("/{name}/details", Details.class, name);

}

}

RestTemplateBuilder 包含许多可用于快速配置 RestTemplate 的有用方法。例如,要添加BASIC auth支持,可以使

用 builder.basicAuthentication("user", "password").build() 。

34.1 RestTemplate自定义

RestTemplate 自定义有三种主要方法,具体取决于您希望自定义应用的广泛程度。

要使任何自定义的范围尽可能窄,请注入自动配置的 RestTemplateBuilder ,然后根据需要调用其方法。每个方法调用都返回一个新

的 RestTemplateBuilder 实例,因此自定义只会影响构建器的这种使用。

要进行应用程序范围的附加自定义,请使用 RestTemplateCustomizer bean。所有这些beans都会自动注册到自动配置

的 RestTemplateBuilder ,并应用于使用它构建的任何模板。

以下示例显示了一个自定义程序,它为除 192.168.0.5 之外的所有主机配置代理的使用:

static class ProxyCustomizer implements RestTemplateCustomizer {

@Override

public void customize(RestTemplate restTemplate) {

HttpHost proxy = new HttpHost("proxy.example.com");

HttpClient httpClient = HttpClientBuilder.create()

setRoutePlanner(new DefaultProxyRoutePlanner(proxy) {

@Override

public HttpHost determineProxy(HttpHost target,

HttpRequest request, HttpContext context)

throws HttpException {

if (target.getHostName().equals("192.168.0.5")) {

return null;

}

return super.determineProxy(target, request, context);

}

}).build();

restTemplate.setRequestFactory(

new HttpComponentsClientHttpRequestFactory(httpClient));

}

}

最后,最极端(也很少使用)的选项是创建自己的 RestTemplateBuilder bean。这样做会关闭 RestTemplateBuilder 的自动配置,并阻止

使用任何 RestTemplateCustomizer beans。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-04-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java学习网 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档