专栏首页云计算与大数据Import Kafka data into OSS using E-MapReduce service

Import Kafka data into OSS using E-MapReduce service

Overview Kafka is a frequently-used message queue in open-source communities. Although Kafka (Confluent) officially provides plug-ins to import data directly from Kafka to HDFS's connector, Alibaba Cloud provides no official support for the file storage system OSS. This article will give a simple example to implement data writes from Kafka to Alibaba Cloud OSS. Because Alibaba Cloud E-MapReduce service integrates a large number of open-source components and docking tools for Alibaba Cloud, in this article, the example is directly run in the E-MapReduce cluster. This example uses the open-source Flume tool as a transit to connect Kafka and OSS. Flume open-source components may also appear on the E-MapReduce platform in the future. Scenario example Next we will name a simple example. If you already have an online Kafka cluster, you can directly jump to Step 4. 1. In the Kafka Home directory, start the Kafka service process. Configure the Zookeeper address in the configuration file to the service address emr-header-1:2181 bin/kafka-server-start.sh config/server.properties 2. Create a Kafka topic with a name of test bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 \ --replication-factor 1 --partitions 1 --topic test 3. Write data to Kafka test topic and the data content is the performance monitoring data of the local machine vmstat 1 | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 4. Configure and start the Flume service in the Flume Home directory Create a new configuration file: conf/kafka-example.conf. In specific, specify the source as the corresponding topic for Kafka, and use sink as the HDFS Sinker. Specify the path as the OSS path. Because the E-MapReduce service implements an efficient OSS FileSystem (compatible with Hadoop FileSystem) for us, the OSS path can be specified directly, and the HDFS Sinker data will be automatically written to OSS. # Name the components on this agent a1.sources = source1 a1.sinks = oss1 a1.channels = c1 # Describe/configure the source a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.source1.zookeeperConnect = localhost:2181 a1.sources.source1.topic = test a1.sources.source1.groupId = flume a1.sources.source1.channels = c1 a1.sources.source1.interceptors = i1 a1.sources.source1.interceptors.i1.type = timestamp a1.sources.source1.kafka.consumer.timeout.ms = 100 # Describe the sink a1.sinks.oss1.type = hdfs a1.sinks.oss1.hdfs.path = oss://emr-examples/kafka/%{topic}/%y-%m-%d a1.sinks.oss1.hdfs.rollInterval = 10 a1.sinks.oss1.hdfs.rollSize = 0 a1.sinks.oss1.hdfs.rollCount = 0 a1.sinks.oss1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.oss1.channel = c1 In the configuration of Hadoop core-site.xml, you need to modify /etc/emr/hadoop-conf/core-site.xml, and add OSS-related configuration. <property> <name>fs.oss.endpoint</name> <value>http://oss-cn-hangzhou.aliyuncs.com/</value> </property> <property> <name>fs.oss.accessKeyId</name> <value>set-access-key-id</value> </property> <property> <name>fs.oss.accessKeySecret</name> <value>set-access-key-secret</value> </property> Start the Flume service: bin/flume-ng agent --conf conf --conf-file conf/kafka-example.conf --name a1 \ -Dflume.root.logger=INFO,console In the log you can see that Flume HDFS sinker writes data to the OSS and the writes rotate once every ten seconds. 2016-12-05 18:41:04,794 (hdfs-oss1-call-runner-1) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming oss://emr-perform/kafka/test/16-12-05/Flume Data.1480934454657.tmp to oss://emr-perform/kafka/test/16-12-05/FlumeData.1480934454657 2016-12-05 18:41:04,852 (hdfs-oss1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called. View results on OSS $ hadoop fs -ls oss://emr-examples/kafka/test/16-12-05/ Found 6 items -rw-rw-rw- 1 162691 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934394566 -rw-rw-rw- 1 925 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934407580 -rw-rw-rw- 1 1170 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934418597 -rw-rw-rw- 1 1092 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934430613 -rw-rw-rw- 1 1254 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638 -rw-rw-rw- 1 588 2016-12-05 18:41 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934454657 $ hadoop fs -cat oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638 0 0 0 1911216 50036 1343828 0 0 0 0 1341 2396 1 1 98 0 0 0 0 0 1896964 50052 1343824 0 0 0 112 1982 2511 15 1 84 0 0 1 0 0 1896552 50052 1343828 0 0 0 76 2314 3329 3 4 94 0 0 procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu----- r b swpd free buff cache si so bi bo in cs us sy id wa st 5 0 0 1903016 50052 1343828 0 0 0 0 2277 3249 2 4 94 0 0 0 0 0 1902892 50052 1343828 0 0 0 0 1417 2366 5 0 95 0 0 0 0 0 1902892 50052 1343828 0 0 0 0 1072 2243 0 0 99 0 0 0 0 0 1902892 50068 1343824 0 0 0 144 1275 2283 1 0 99 0 0 1 0 0 1903024 50068 1343828 0 0 0 24 1099 2071 1 1 99 0 0 0 0 0 1903272 50068 1343832 0 0 0 0 1294 2238 1 1 99 0 0 1 0 0 1903412 50068 1343832 0 0 0 0 1024 2094 1 0 99 0 0 2 0 0 1903148 50076 1343836 0 0 0 68 1879 2766 1 1 98 0 0 1 0 0 1903288 50092 1343840 0 0 0 92 1147 2240 1 0 99 0 0 0 0 0 1902792 50092 1343844 0 0 0 28 1456 2388 1 1 98 0 0 References 1. http://kafka.apache.org/quickstart 2. https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html

本文分享自微信公众号 - 云计算与大数据(heidcloud)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-09-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 研发:jenkins 持续集成如何打tag

    uild Timestamp Plugin will be the Best Answer to get the TIMESTAMPS in the Build...

    heidsoft
  • linux kernel Documentation filesystems overlayfs

    Please see MAINTAINERS file for where to send questions.

    heidsoft
  • Kubernetes|Operators Best Practices

    Kubernetes Operators are processes connecting to the master API and watching for...

    heidsoft
  • Pass Multiple Values from a GridView to Another Page using ASP.NET

    Pass Multiple Values from a GridView to Another Page  using ASP.NET A common req...

    阿新
  • 聊聊flink KeyedStream的reduce操作

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/data...

    codecraft
  • Codeforces 716A Crazy Computer

    A. Crazy Computer time limit per test:2 seconds memory limit per test:256 megaby...

    Angel_Kitty
  • CNN层和特征可视化VGG-16

    CNN由处理视觉信息的层组成。CNN首先接收输入图像,然后将其传递通过这些层。有几种不同类型的层:最常用的层:卷积,池化和完全连接的层。 首先,让我们来看看完...

    小飞侠xp
  • 使用社交媒体上粉丝的帖子来衡量品牌之间的相似性(Social and Information Networks)

    在这篇论文中,我们提出了一种新的测量方法,通过社交网络服务(SNS)上的品牌追随者的帖子来估计品牌之间的相似性。我们的方法是为了探索客户可能共同购买的品牌而开发...

    用户6869393
  • 芬兰语建模与深层变压器模型(CS SD)

    在LSTM被认为是主导模型体系结构之后的很长一段时间,转换器在语言建模中占据了中心舞台。在这个课题中,我们研究了BRET转换器结构和XL转换器结构在语言建模任务...

    用户6853689
  • Tomcat 9访问 Host Manager

    由于机器环境是win7 旗舰版 64位,所以选择下载64-bit windows .zip。

    拓荒者

扫码关注云+社区

领取腾讯云代金券