前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka快速入门系列(13) | Flume对接Kafka

Kafka快速入门系列(13) | Flume对接Kafka

作者头像
不温卜火
发布2020-10-28 16:49:52
4540
发布2020-10-28 16:49:52
举报
文章被收录于专栏:不温卜火不温卜火

  本篇博主带来的是Flume对接Kafka。

1. Kafka与Flume比较

在企业中必须要清楚流式数据采集框架flume和kafka的定位是什么:

  • 1. flume:cloudera公司研发 适合多个生产者; 适合下游数据消费者不多的情况; 适合数据安全性要求不高的操作; 适合与Hadoop生态圈对接的操作。
  • 2.kafka:linkedin公司研发: 适合数据下游消费众多的情况; 适合数据安全性要求较高的操作,支持replication。

因此我们常用的一种模型是: 线上数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS

2. Flume与kafka集成

  • 1. 编写代码
package com.buwenbuhuo.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;

/**
 * @author 卜温不火
 * @create 2020-05-07 18:57
 * com.buwenbuhuo.flume.interceptor - the name of the target package where the new class or interface will be created.
 * kafka0506 - the name of the current project.
 */
public class Customlnterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        if (event.getBody()[0] >= '0' && event.getBody()[0] <= '9'){
            event.getHeaders().put("topic","number");
        }else if (event.getBody()[0] >= 'a' && event.getBody()[0] <= 'z'){
            event.getHeaders().put("topic","letter");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        for (Event event : events){
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{
        public Interceptor build(){
            return new Customlnterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
  • 2. 打包上传
1
1
2
2
  • 3. 配置flume(nc-kafka.conf)
[bigdata@hadoop002 job]$ vim nc-kafka.conf

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
a1.sources.r1.shell = /bin/bash -c

# Describe the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop002
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.buwenbuhuo.flume.interceptor.Customlnterceptor$Builder

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop002:9092,hadoop003:9092,hadoop004:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 4. 启动flume
[bigdata@hadoop002 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/nc-kafka.conf 
3
3
  • 5. 分别在hadoop003,hadoop004启动消费者
[bigdata@hadoop003 kafka]$ bin/kafka-console-consumer.sh  --bootstrap-server hadoop002:9092 --topic number
[bigdata@hadoop004 kafka]$ bin/kafka-console-consumer.sh  --bootstrap-server hadoop002:9092 --topic letter
4
4
  • 6. 启动端口测试
[bigdata@hadoop003 module]$ nc hadoop002 44444
5
5

可以看到最终结果图与我们设想是一致的,所以此次实验成功。

  本次的分享就到这里了

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-05-24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. Kafka与Flume比较
  • 2. Flume与kafka集成
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档