作者:lxw的大数据田地 By 暴走大数据 场景描述:如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,...具体表现为,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,数据仍然会被写入到Source中指定的Topic中。...关键词:Flume Kafka 问题发现 如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体表现为...,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,数据仍然会被写入到Source中指定的Topic中。...在Kafka Source中 源码:org.apache.flume.source.kafka.KafkaSource.process() // Add headers to event (topic,
--topic test --from-beginning 配置flume 创建配置文件kafak.conf a1.sources=r1 a1.channels=c1 a1.sinks=k1 a1...sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=44444 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink...a1.sinks.k1.kafka.topic = test a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize...capacity=1000 a1.channels.c1.transactionCapacity=100 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 启动flume...flume-ng agent -n a1 -c conf/ -f conf/kafka.conf -Dflume.root.logger=INFO, console 发送消息 telnet localhost
topic test 在Flume的安装目录的conf子目录下创建一个配置文件kafka.conf,内容如下: # Name the components on this agent a1.sources...= r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost...channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source...这个单词会发送给Flume,然后,Flume发送给Kafka。 打开第6个cmd窗口,执行如下命令: > cd c:\kafka_2.12-2.4.0 > ....上面命令执行以后,就可以在屏幕上看到“hadoop”,说明Kafka成功接 收到了数据。
flume和kafka的整合 复制flume要用到的kafka相关jar到flume目录下的lib里面。...-1.5.0.jar,flume-ng-core-1.5.0.jar,zkclient-0.3.jar,commons-logging-1.1.1.jar,在flume目录中,可以找到这几个jar文件,...,Flume已经向kafka发送了消息 在刚才s1机器上打开的kafka消费端,同样可以看到从Flume中发出的信息,说明flume和kafka已经调试成功了 kafka和storm的整合 我们先在eclipse...和storm的结合 打开两个窗口(也可以在两台机器上分别打开),分别m2上运行kafka的producer,在s1上运行kafka的consumer(如果刚才打开了就不用再打开),先测试kafka自运行是否正常...flume、kafka、storm的整合 从上面两个例子我们可以看到,flume和kafka之前已经完成了通讯和部署,kafka和storm之间可以正常通讯,只差把storm的相关文件打包成jar部署到
Flume 1.4.0 User Guide 地址:http://archive.cloudera.com/cdh4/cdh/4/flume-ng-1.4.0-cdh4.6.0/FlumeUserGuide.html...一 安装与环境配置 下载地址 http://archive.cloudera.com/cdh4/cdh/4/flume-ng-latest.tar.gz ,使用chd4版本。...= channel1 agent1.sinks = sink1 #Describe the source agent1.sources.source1.type = spooldir # source...#Describe the source agent1.sources.source1.type = avro # avro source 监听的地址和端口 agent1.sources.source1...运行前需要先为flume-ng赋予可执行权限:chmod 777 flume-ng。 在bin目录下运行命令 ,程序即可执行。
使用Flume实现MySQL与Kafka实时同步 一、Kafka配置 1.创建Topic ....https://github.com/keedio/flume-ng-sql-source/archive/v1.5.2.tar.gz 2.解压 tar -xivf apache-flume-1.9.0...-bin.tar.gz tar -xivf flume-ng-sql-source-1.5.2.tar.gz 3.编译flume-ng-sql-source jar包 mvn package 将编译好的...one of the sources, the type is defined a1.sources.src-1.type = org.keedio.flume.source.SQLSource #...agent -n a1 -c conf -f conf/mysql-flume.conf -Dflume.root.logger=INFO,console 注意事项 1.kafka producer
Flume的文章《非Kerberos环境下Kafka数据到Flume进Hive表》、《如何使用Flume准实时建立Solr的全文索引》、《如何在Kerberos环境使用Flume采集Kafka数据并写入...HDFS》和《如何使用Flume采集Kafka数据写入Kudu》,本篇文章Fayson主要介绍在非Kerberos的CDH集群中使用Flume采集Kafka数据写入HBase。...2.在Agent类别的“配置文件”中输入如下内容: kafka.sources = source1 kafka.channels = channel1 kafka.sinks = sink1 kafka.sources.source1....type = org.apache.flume.source.kafka.KafkaSource kafka.sources.source1.kafka.bootstrap.servers = cdh01...kafka.sources.source1.kafka.consumer.group.id = flume-consumer kafka.sources.source1.channels = channel1
Flume的文章《非Kerberos环境下Kafka数据到Flume进Hive表》、《如何使用Flume准实时建立Solr的全文索引》和《如何在Kerberos环境使用Flume采集Kafka数据并写入...HDFS》,本篇文章Fayson主要介绍在非Kerberos的CDH集群中使用Flume采集Kafka数据写入Kudu。....type = org.apache.flume.source.kafka.KafkaSource kafka.sources.source1.kafka.bootstrap.servers = cdh01...kafka.sources.source1.kafka.consumer.group.id = flume-consumer kafka.sources.source1.channels = channel1...可以看到数据已写入到Kudu表,查看表总数与发送Kafka数量一致 ?
", new UDMExternalCatalog()) tableEnv.sqlUpdate( s"""INSERT INTO `kafka.kafka-k8s.pb_sink_test..., |filedName1, |filedName2, |userId, |brandNames |from kafka...`kafka-k8s`....有图可知,主要分为4大步骤,先通过calcite分析sql,转为相应的relnode,在根据用户配置的schema和Java spi,过滤出需要的kafka produce和kafka consumer...kafka consumer对应于select部分 kafka produce对应于insert部分
在前面的文章Fayson也介绍了一些关于Flume的文章《非Kerberos环境下Kafka数据到Flume进Hive表》、《如何使用Flume准实时建立Solr的全文索引》、《如何在Kerberos环境使用...Flume采集Kafka数据并写入HDFS》、《如何使用Flume采集Kafka数据写入Kudu》和《如何使用Flume采集Kafka数据写入HBase》。....type = org.apache.flume.source.kafka.KafkaSource kafka.sources.source1.kafka.bootstrap.servers = cdh01...注:配置与Fayson前面讲的非Kerberos环境下有些不一样,增加了Kerberos的配置,这里的HBaseSink还是使用的Fayson自定义的Sink,具体可以参考前一篇文章《如何使用Flume...Flume中使用的HBaseSink是Fayson前面一篇文章中将的自定义HBaseSink,可以指定HBase表的rowkey及支持Kerberos认证。
2 Flume与Kafka的选取 采集层主要可以使用Flume、Kafka两种技术。 Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API。 ...所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。...Flume和Kafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置Kafka的Source读取数据也是可行的:你没有必要实现自己的消费者。...你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。...events) 处理多个Event,在这个方法中调用Event intercept(Event event) close方法 (3)静态内部类,实现Interceptor.Builder 9.3 拦截器可以不用吗
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。...1.文档编写目的 ---- 在Kafka集群实际应用中,Kafka的消费者有很多种(如:应用程序、Flume、Spark Streaming、Storm等),本篇文章主要讲述如何在Kerberos环境使用...Flume采集Kafka数据并写入HDFS。...= k1 kafka.sources.s1.type =org.apache.flume.source.kafka.KafkaSource kafka.sources.s1.kafka.bootstrap.servers...- kafka.sinks.k1.hdfs.writeFormat = Text [3e09jb0oju.jpeg] 关于HDFS Sink的更多配置可以参考:http://flume.apache.org
下载完成之后,使用tar进行解压 tar -zvxf apache-flume-1.6..0-bin.tar....进入flume的conf配置包中,使用命令touch flume.conf,然后cp flume-conf.properties.template flume.conf 使vim/gedit flume.conf...在Flume配置文件中,我们需要 1. 需要命名当前使用的Agent的名称. 2. 命名Agent下的source的名字. 3....Sink 以上的类型,你可以根据自己的需求来搭配组合使用,当然如果你愿意,你可以为所欲为的搭配.比如我们使用Avro source类型,采用Memory channel,使用HDFS sink存储,...下面我们来逐一的细说; Source的配置 注: 需要特别说明,在Agent中对于存在的N(N>1)个source,其中的每一个source都需要单独进行配置,首先我们需要对source的type进行设置
Flume拦截器的使用 在整个流程中,有两个地方用到了同一个Flume拦截器(Regex Extractor Interceptor),就是在Flume Source中从消息中提取数据,并加入到Header...,供Sink使用; 一处是在LogServer上部署的Flume Source,它从原始日志中提取出用户ID,然后加入到Header中,Flume Sink(Kafka Sink)再入Kafka之前,从...Flume消费者的负载均衡和容错 在北京部署的Flume,使用Kafka Source从Kafka中读取数据流向北京Hadoop集群,西安的也一样,在消费同一Topic的消息时候,我们都是在两台机器上启动了两个...其它实时数据消费者 如果需要实时统计一小段时间(比如十分钟、一小时)之内的PV、UV等指标,那么可以使用SparkStreaming来完成,比较简单。...如果单独使用Spark Streaming来完成一天内海量数据的累计去重统计,我还不太清楚有什么好的解决办法。 另外,实时OLAP也可能作为Kafka的实时消费者应用,比如:Druid。
其中source有很多种可以选择,channel有很多种可以选择,sink也同样有多种可以选择,并且都支持自定义。...不同于exec Source,该source是可靠的并且不会丢失数据,即使flume被重启或者杀死。为了交换这种可靠性,只有不可变的,唯一命名的文件可以放入监控目录。...sink flume sink可以将数据发布到kafka一个topic。...如果您有多个Kafka source运行,您可以使用相同的消费者组配置它们,以便于每个kafka Source实例消费单独的一组partition数据。...使用此sink需要安装hadoop,以便Flume可以使用Hadoop jars与HDFS集群进行通信。请注意,需要支持sync()调用的Hadoop版本。
基于这样的结论,Hadoop 开发商 Cloudera 推荐如果数据需要被多个应用程序消费的话,推荐使用 Kafka,如果数据只是面向 Hadoop 的,可以使用 Flume。...如果你的数据来源已经确定,不需要额外的编码,那你可以使用 Flume 提供的 sources 和 sinks,反之,如果你需要准备自己的生产者和消费者,那你需要使用 Kafka。...使用 Kafka 的管道特性不会有这样的问题。 Flume 和 Kafka 可以一起工作的。...如果你需要把流式数据从 Kafka 转移到 Hadoop,可以使用 Flume 代理 (agent),将 kafka 当作一个来源 (source),这样可以从 Kafka 读取数据到 Hadoop。...你不需要去开发自己的消费者,你可以使用 Flume 与 Hadoop、HBase 相结合的特性,使用 Cloudera Manager 平台监控消费者,并且通过增加过滤器的方式处理数据。
3.Flume与Kafka的选取 采集层主要可以使用 Flume、Kafka 两种技术。Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展 API。...如果已经存在的 Flume Sources 和 Sinks 满足你的需求,并且你更喜欢不需要任何开发的系统,请使用 Flume。Flume 可以使用拦截器实时处理数据。...Flume 和 Kafka 可以很好地结合起来使用。...如果你的设计需要从 Kafka 到Hadoop 的流数据,使用 Flume 代理并配置 Kafka 的 Source 读取数据也是 可行的:你没有必要实现自己的消费者。...你可以直接利用 Flume 与 HDFS 及HBase 的结合的所有好处。你可以使用 Cloudera Manager 对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。
扇入就是Source可以接受多个输入,扇出就是Sink可以将数据输出多个目的地。...Flume实际上是一个分布式的管道架构,可以看做在数据源和目的地之间有一个agent的网络,并支持数据路由 数据路由 Flume Agent包括Source、Channel、Sink组成。...例如:Sink可以将数据写到下一个Agent的Source中 为保证Flume的可靠性,Flume在Source和Channel之间采用Interceptors拦截器用来更改或者检查Flume的events...Topics 数据源可以使用Kafka按主题发布信息给订阅者 Topics是消息的分类名。Kafka集群或Broker为每一个主题都会维护一个分区日志。...Consumers Kafka提供一种单独的消费者抽象,此抽象具有两种模式的特征消费组,Queuing和Publish-SubScribe。消费者使用相同的消费组名字来标识。
领取专属 10元无门槛券
手把手带您无忧上云