前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >实战经验 | Flume中同时使用Kafka Source和Kafka Sink的Topic覆盖问题

实战经验 | Flume中同时使用Kafka Source和Kafka Sink的Topic覆盖问题

作者头像
大数据真好玩
发布2019-09-12 11:49:34
1.7K2
发布2019-09-12 11:49:34
举报
文章被收录于专栏:暴走大数据暴走大数据

作者:lxw的大数据田地

By 暴走大数据

场景描述:如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体表现为,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,数据仍然会被写入到Source中指定的Topic中。

关键词:Flume Kafka

问题发现

如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体表现为,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,数据仍然会被写入到Source中指定的Topic中。

比如:在Agent中的Kafka Source配置Topic为:

agent_myAgent.sources.kafkaSource.topic = sourceTopic

在Kafka Sink配置Topic为:

agent_myAgent.sinks.kafkaSink.topic = sinkTopic

你会发现,最后数据又被写入到sourceTopic中,而sinkTopic没有任何数据写入。

经过DEBUG和分析,原因如下:在Kafka Sink中,配置项官网文档说明如下:

属性名topic,默认值为default-flume-topic。

The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here。

如果event header中包含了key为”topic”的值,那么将会覆盖该属性配置。

在源码org.apache.flume.sink.kafka.KafkaSink.process()中,

if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
            eventTopic = topic;
 }

其中topic是从属性agent_myAgent.sinks.kafkaSink.topic = sinkTopic 中获取的属性值(如果没有配置,则使用默认topic名称)

topic = context.getString(KafkaSinkConstants.TOPIC,KafkaSinkConstants.DEFAULT_TOPIC);

即:先使用event header中key为”topic”的值作为sink的topic,如果event header中没有,才取属性中配置的topic。

在Kafka Source中

源码:org.apache.flume.source.kafka.KafkaSource.process()

// Add headers to event (topic, timestamp, and key)headers = new HashMap<String, String>();headers.put(KafkaSourceConstants.TIMESTAMP,String.valueOf(System.currentTimeMillis()));headers.put(KafkaSourceConstants.TOPIC, topic);

在event中,将Kafka Source中配置的topic加入到了header中。

因此,在Kafka Sink中,首先从event header中读取到了topic,Sink端的配置项不起作用。

解决办法

使用Flume拦截器,修改event header中key=topic的值为目标topic,拦截器使用Static interceptor,配置如下:

## Source 拦截器
agent_myAgent.sources.kafkaSource.interceptors = i1
agent_myAgent.sources.kafkaSource.interceptors.i1.type = static
agent_myAgent.sources.kafkaSource.interceptors.i1.key = topic
agent_myAgent.sources.kafkaSource.interceptors.i1.preserveExisting = false
agent_myAgent.sources.kafkaSource.interceptors.i1.value = sinkTopic

其中,要特别注意preserveExisting属性值需要设置为false,该属性设置如果event header中已经存在key=topic,是否保留原来的值,默认为true,如果为true,那么还是会保留原来的topic,你设置的将不会生效。

另外,Kafka Sink中可以不用再设置topic属性了,反正也没用。 agent_myAgent.sinks.kafkaSink.topic = sinkTopic //不需要了

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-09-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档