首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

实战经验 | Flume中同时使用Kafka SourceKafka Sink的Topic覆盖问题

作者:lxw的大数据田地 By 暴走大数据 场景描述:如果在一个Flume Agent中同时使用Kafka SourceKafka Sink来处理events,便会遇到Kafka Topic覆盖问题,...具体表现为,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,数据仍然会被写入到Source中指定的Topic中。...关键词:Flume Kafka 问题发现 如果在一个Flume Agent中同时使用Kafka SourceKafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体表现为...,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,数据仍然会被写入到Source中指定的Topic中。...在Kafka Source中 源码:org.apache.flume.source.kafka.KafkaSource.process() // Add headers to event (topic,

1.8K30
您找到你想要的搜索结果了吗?
是的
没有找到

FlumeKafka、Storm如何结合使用

flumekafka的整合 复制flume要用到的kafka相关jar到flume目录下的lib里面。...-1.5.0.jar,flume-ng-core-1.5.0.jar,zkclient-0.3.jar,commons-logging-1.1.1.jar,在flume目录中,可以找到这几个jar文件,...,Flume已经向kafka发送了消息 在刚才s1机器上打开的kafka消费端,同样可以看到从Flume中发出的信息,说明flumekafka已经调试成功了 kafka和storm的整合 我们先在eclipse...和storm的结合 打开两个窗口(也可以在两台机器上分别打开),分别m2上运行kafka的producer,在s1上运行kafka的consumer(如果刚才打开了就不用再打开),先测试kafka自运行是否正常...flumekafka、storm的整合 从上面两个例子我们可以看到,flumekafka之前已经完成了通讯和部署,kafka和storm之间可以正常通讯,只差把storm的相关文件打包成jar部署到

91120

如何在Kerberos环境下使用Flume采集Kafka数据写入HBase

在前面的文章Fayson也介绍了一些关于Flume的文章《非Kerberos环境下Kafka数据到Flume进Hive表》、《如何使用Flume准实时建立Solr的全文索引》、《如何在Kerberos环境使用...Flume采集Kafka数据并写入HDFS》、《如何使用Flume采集Kafka数据写入Kudu》和《如何使用Flume采集Kafka数据写入HBase》。....type = org.apache.flume.source.kafka.KafkaSource kafka.sources.source1.kafka.bootstrap.servers = cdh01...注:配置与Fayson前面讲的非Kerberos环境下有些不一样,增加了Kerberos的配置,这里的HBaseSink还是使用的Fayson自定义的Sink,具体可以参考前一篇文章《如何使用Flume...Flume使用的HBaseSink是Fayson前面一篇文章中将的自定义HBaseSink,可以指定HBase表的rowkey及支持Kerberos认证。

1K20

Flume

2 FlumeKafka的选取   采集层主要可以使用FlumeKafka两种技术。   FlumeFlume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API。   ...所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用使用Flume。...FlumeKafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置KafkaSource读取数据也是可行的:你没有必要实现自己的消费者。...你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。...events) 处理多个Event,在这个方法中调用Event intercept(Event event)   close方法 (3)静态内部类,实现Interceptor.Builder 9.3 拦截器可以不用

20220

flume安装及配置介绍(二)

下载完成之后,使用tar进行解压 tar -zvxf apache-flume-1.6..0-bin.tar....进入flume的conf配置包中,使用命令touch flume.conf,然后cp flume-conf.properties.template flume.conf  使vim/gedit flume.conf...在Flume配置文件中,我们需要      1. 需要命名当前使用的Agent的名称.      2. 命名Agent下的source的名字.      3....Sink  以上的类型,你可以根据自己的需求来搭配组合使用,当然如果你愿意,你可以为所欲为的搭配.比如我们使用Avro source类型,采用Memory channel,使用HDFS sink存储,...下面我们来逐一的细说; Source的配置 注: 需要特别说明,在Agent中对于存在的N(N>1)个source,其中的每一个source都需要单独进行配置,首先我们需要对source的type进行设置

825110

记录一下互联网日志实时收集和实时计算的简单方案

Flume拦截器的使用 在整个流程中,有两个地方用到了同一个Flume拦截器(Regex Extractor Interceptor),就是在Flume Source中从消息中提取数据,并加入到Header...,供Sink使用; 一处是在LogServer上部署的Flume Source,它从原始日志中提取出用户ID,然后加入到Header中,Flume Sink(Kafka Sink)再入Kafka之前,从...Flume消费者的负载均衡和容错 在北京部署的Flume使用Kafka SourceKafka中读取数据流向北京Hadoop集群,西安的也一样,在消费同一Topic的消息时候,我们都是在两台机器上启动了两个...其它实时数据消费者 如果需要实时统计一小段时间(比如十分钟、一小时)之内的PV、UV等指标,那么可以使用SparkStreaming来完成,比较简单。...如果单独使用Spark Streaming来完成一天内海量数据的累计去重统计,我还不太清楚有什么好的解决办法。 另外,实时OLAP也可能作为Kafka的实时消费者应用,比如:Druid。

55140

记录一下互联网日志实时收集和实时计算的简单方案

Flume拦截器的使用 在整个流程中,有两个地方用到了同一个Flume拦截器(Regex Extractor Interceptor),就是在Flume Source中从消息中提取数据,并加入到Header...,供Sink使用; 一处是在LogServer上部署的Flume Source,它从原始日志中提取出用户ID,然后加入到Header中,Flume Sink(Kafka Sink)再入Kafka之前,从...Flume消费者的负载均衡和容错 在北京部署的Flume使用Kafka SourceKafka中读取数据流向北京Hadoop集群,西安的也一样,在消费同一Topic的消息时候,我们都是在两台机器上启动了两个...其它实时数据消费者 如果需要实时统计一小段时间(比如十分钟、一小时)之内的PV、UV等指标,那么可以使用SparkStreaming来完成,比较简单。...如果单独使用Spark Streaming来完成一天内海量数据的累计去重统计,我还不太清楚有什么好的解决办法。 另外,实时OLAP也可能作为Kafka的实时消费者应用,比如:Druid。

66320

记录一下互联网日志实时收集和实时计算的简单方案

Flume拦截器的使用 在整个流程中,有两个地方用到了同一个Flume拦截器(Regex Extractor Interceptor),就是在Flume Source中从消息中提取数据,并加入到Header...,供Sink使用; 一处是在LogServer上部署的Flume Source,它从原始日志中提取出用户ID,然后加入到Header中,Flume Sink(Kafka Sink)再入Kafka之前,从...Flume消费者的负载均衡和容错 在北京部署的Flume使用Kafka SourceKafka中读取数据流向北京Hadoop集群,西安的也一样,在消费同一Topic的消息时候,我们都是在两台机器上启动了两个...其它实时数据消费者 如果需要实时统计一小段时间(比如十分钟、一小时)之内的PV、UV等指标,那么可以使用SparkStreaming来完成,比较简单。...如果单独使用Spark Streaming来完成一天内海量数据的累计去重统计,我还不太清楚有什么好的解决办法。 另外,实时OLAP也可能作为Kafka的实时消费者应用,比如:Druid。

86420

flume应该思考的问题

基于这样的结论,Hadoop 开发商 Cloudera 推荐如果数据需要被多个应用程序消费的话,推荐使用 Kafka,如果数据只是面向 Hadoop 的,可以使用 Flume。...如果你的数据来源已经确定,不需要额外的编码,那你可以使用 Flume 提供的 sources 和 sinks,反之,如果你需要准备自己的生产者和消费者,那你需要使用 Kafka。...使用 Kafka 的管道特性不会有这样的问题。 FlumeKafka 可以一起工作的。...如果你需要把流式数据从 Kafka 转移到 Hadoop,可以使用 Flume 代理 (agent),将 kafka 当作一个来源 (source),这样可以Kafka 读取数据到 Hadoop。...你不需要去开发自己的消费者,你可以使用 Flume 与 Hadoop、HBase 相结合的特性,使用 Cloudera Manager 平台监控消费者,并且通过增加过滤器的方式处理数据。

1.3K110

【最全的大数据面试系列】Flume面试题大全

3.FlumeKafka的选取 采集层主要可以使用 FlumeKafka 两种技术。FlumeFlume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展 API。...如果已经存在的 Flume Sources 和 Sinks 满足你的需求,并且你更喜欢不需要任何开发的系统,请使用 FlumeFlume 可以使用拦截器实时处理数据。...FlumeKafka 可以很好地结合起来使用。...如果你的设计需要从 Kafka 到Hadoop 的流数据,使用 Flume 代理并配置 KafkaSource 读取数据也是 可行的:你没有必要实现自己的消费者。...你可以直接利用 Flume 与 HDFS 及HBase 的结合的所有好处。你可以使用 Cloudera Manager 对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。

87120

大数据采集架构

扇入就是Source可以接受多个输入,扇出就是Sink可以将数据输出多个目的地。...Flume实际上是一个分布式的管道架构,可以看做在数据源和目的地之间有一个agent的网络,并支持数据路由 数据路由 Flume Agent包括Source、Channel、Sink组成。...例如:Sink可以将数据写到下一个Agent的Source中 为保证Flume的可靠性,FlumeSource和Channel之间采用Interceptors拦截器用来更改或者检查Flume的events...Topics 数据源可以使用Kafka按主题发布信息给订阅者 Topics是消息的分类名。Kafka集群或Broker为每一个主题都会维护一个分区日志。...Consumers Kafka提供一种单独的消费者抽象,此抽象具有两种模式的特征消费组,Queuing和Publish-SubScribe。消费者使用相同的消费组名字来标识。

77940
领券