Flume pull方式和push方式整合

  • Pull方式

Flume Agent 编写

# Name the components on this agent
simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel

# Describe/configure the source
simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = localhost
simple-agent.sources.netcat-source.port = 44444

# Describe the sink
simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname =localhost
simple-agent.sinks.spark-sink.port =41414
simple-agent.sinks.spark-sink.channel = memoryChannel

# Use a channel which buffers events in memory
simple-agent.channels.memory-channel.type = memory


# Bind the source and sink to the channel
simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.spark-sink.channel = memory-channel

启动Flume

flume-ng agent \
--name simple-agent \
--conf conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume_pull_streaming.conf \
-Dflume.root.logger=INFO,console &
  • Push方式

Flume Agent的编写

# Name the components on this agent
simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel

# Describe/configure the source
simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = localhost
simple-agent.sources.netcat-source.port = 44444

# Describe the sink
simple-agent.sinks.avro-sink.type = avro
simple-agent.sinks.avro-sink.hostname = localhost
simple-agent.sinks.avro-sink.port = 41414

# Use a channel which buffers events in memory
simple-agent.channels.memory-channel.type = memory


# Bind the source and sink to the channel
simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel = memory-channel

启动flume

flume-ng agent \
--name simple-agent \
--conf conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume_push_streaming.conf \
-Dflume.root.logger=INFO,console &

==注意在本地和服务器上切换的时候需要修改flume的sink的hostname==

本地测试总结

  1. 启动SparkStreaming作业
  2. 启动flume agent
  3. 通过telnet输入数据,观察IDEA控制台的输出

提交到服务器

spark-submit \
--class com.gwf.spark.FlumePushWordCount \
--master local[2] \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \
/Users/gaowenfeng/Documents/IDE/newsell/spark-train/target/spark-train-1.0-SNAPSHOT.jar localhost 41414
spark-submit \
--class com.gwf.spark.FlumePushWordCount \
--master local[2] \
/Users/gaowenfeng/Documents/IDE/newsell/spark-train/target/spark-train-1.0-SNAPSHOT.jar localhost 41414

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏史上最简单的Spring Cloud教程

SpringBoot非官方教程 | 第十三篇:springboot集成spring cache

本文介绍如何在springboot中使用默认的spring cache, 声明式缓存 Spring 定义 CacheManager 和 Cache 接口用来统...

24480
来自专栏全栈架构

Spring Boot 与 Kotlin 处理Web表单提交

我们在做web开发的时候,肯定逃不过表单提交,这篇文章通过Spring Boot使用Kotlin 语言 创建和提交一个表单。

8420
来自专栏Dawnzhang的开发者手册

@Controller和@RestController的区别?

1)如果只是使用@RestController注解Controller,则Controller中的方法无法返回jsp页面,配置的视图解析器InternalRes...

62430
来自专栏hbbliyong

MySQL 8.0版本连接报错:Could not create connection to database server.

 准备搭建一个Spring Boot 组合mybatis的项目,数据库采用的是MySQL 8.0.11按照以往的配置,使用插件mybatis-generator...

2.9K20
来自专栏闵开慧

ERROR security.UserGroupInformation: Priviledge...

"Failed to set permissions of path"问题 参考文献:https://issues.apache.org/jira/browse...

36790
来自专栏nnngu

01 Spring Boot 的简单配置和使用

Spring Boot 简介 使用 Spring Boot 可以让我们快速创建一个基于 Spring 的项目,而让这个 Spring 项目跑起来我们只需要很少的...

34750
来自专栏web编程技术分享

用SpringBoot搭建简单电商项目 01

92380
来自专栏开发与安全

About Cache Coherence, Atomic Operation, Memory Ordering, Memory Barrier, Volatile

写这篇文章的起因是看到何登成博士发的一个微博问题,如下: ? 自己想不太明白,顺下找了他以前分享的一些资料和其他人的博客阅读,在这里做个笔记,内容主要来自何博的...

31400
来自专栏Google Dart

AOP切面编程一 原

9430
来自专栏乐沙弥的世界

MySQL PXC 5.7 invalid user‘@MYSQLD_USER@’

最近的PXC 5.7启动的时候收到一个无效用户的提示,invalid user ‘@MYSQLD_USER@’,这个问题还真是头一次碰到,而且这个MYSQLD_...

15030

扫码关注云+社区

领取腾讯云代金券