如何使用Flume采集Kafka数据写入HBase

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github: https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


在前面的文章Fayson也介绍了一些关于Flume的文章《非Kerberos环境下Kafka数据到Flume进Hive表》、《如何使用Flume准实时建立Solr的全文索引》、《如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS》和《如何使用Flume采集Kafka数据写入Kudu》,本篇文章Fayson主要介绍在非Kerberos的CDH集群中使用Flume采集Kafka数据写入HBase。本文的数据流图如下:

  • 内容概述

1.环境准备及开发自定义HBaseSink

2.配置Flume Agent

3.流程测试

4.总结

  • 测试环境

1.CM和CDH版本为5.12.1

2.采用root用户操作

  • 前置条件

1. Flume已安装

2.HBase服务已安装且正常运行

2.环境准备


1.准备向Kafka发送数据的脚本

这里脚本Fayson就不在详细说明了,前面的文章及Github上均有说明:

https://github.com/fayson/cdhproject/tree/master/kafkademo/0283-kafka-shell

(可左右滑动)

2.通过hbase shell命令创建HBase测试表

create 'fayson_ods_deal_daily','info'

3.开发HBaseSink示例


在CDH集群中Flume-ng默认添加了HBaseSink依赖包,但HBaseSink依赖包只支持两种序列化模式:

SimpleHbaseEventSerializer:将整个Event的Body部分当做完整的一列写入HBase

RegexHbaseEventSerializer:根据正则表达式将Event Body拆分到不同的列

写正则表达式Fayson不擅长,对于复杂结构数据时正则表达式的复杂度可想而知且不便于维护,所以这里Fayson选择使用自定义的HBaseSink方式来完成Json数据的解析及rowkey的指定。

1.在前面Fayson创建的flume-sink工程上继续开发HBaseSink

2.开发HBaseSink需要添加HBase相关的依赖包

<!-- HBase Sink 依赖包 -->
<dependency>
    <groupId>org.apache.flume.flume-ng-sinks</groupId>
    <artifactId>flume-ng-hbase-sink</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>
<!-- HBase Client 依赖包 -->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.2.0-cdh5.12.1</version>
</dependency>

(可左右滑动)

3.借助于原生的HBaseSink重新创建了一个FaysonHBaseSink类,该类为指定的sink.type类型,由于代码过长,该类只贴代码片段

/**
 * package: com.cloudera.hbase
 * describe: 自定义HBaseSink,实现了自定义Rowkey及解析JSON字符串
 * creat_user: Fayson
 * email: htechinfo@163.com
 * creat_date: 2018/6/3
 * creat_time: 下午11:43
 * 公众号:Hadoop实操
 */
public class FaysonHBaseSink extends AbstractSink implements Configurable {
    private String tableName;
    private byte[] columnFamily;
    //增加自定义Rowkey字段,可以用多个列组合,以","分割
    ...
    private static final Logger logger = LoggerFactory.getLogger(com.cloudera.hbase.FaysonHBaseSink.class);
    private FaysonHBaseEventSerializer serializer;
    @Override
    public Status process() throws EventDeliveryException {
        Status status = Status.READY;
        Channel channel = getChannel();
        Transaction txn = channel.getTransaction();
        List<Row> actions = new LinkedList<Row>();
        List<Increment> incs = new LinkedList<Increment>();
        try {
            txn.begin();
            if (serializer instanceof BatchAware) {
                ((BatchAware)serializer).onBatchStart();
            }
            long i = 0;
            for (; i < batchSize; i++) {
                Event event = channel.take();
                if (event == null) {
                    if (i == 0) {
                        status = Status.BACKOFF;
                        sinkCounter.incrementBatchEmptyCount();
                    } else {
                        sinkCounter.incrementBatchUnderflowCount();
                    }
                    break;
                } else {
                    if(rowKeys != null && rowKeys.length() > 0) {
                        serializer.initialize(event, columnFamily, rowKeys);
                    } else {
                        serializer.initialize(event, columnFamily);
                    }
                    actions.addAll(serializer.getActions());
                    incs.addAll(serializer.getIncrements());
                }
            }
            if (i == batchSize) {
                sinkCounter.incrementBatchCompleteCount();
            }
            sinkCounter.addToEventDrainAttemptCount(i);
            putEventsAndCommit(actions, incs, txn);
        } catch (Throwable e) {
            try{
                txn.rollback();
            } catch (Exception e2) {
                logger.error("Exception in rollback. Rollback might not have been " +
                        "successful." , e2);
            }
            logger.error("Failed to commit transaction." +
                    "Transaction rolled back.", e);
            if(e instanceof Error || e instanceof RuntimeException){
                logger.error("Failed to commit transaction." +
                        "Transaction rolled back.", e);
                Throwables.propagate(e);
            } else {
                logger.error("Failed to commit transaction." +
                        "Transaction rolled back.", e);
                throw new EventDeliveryException("Failed to commit transaction." +
                        "Transaction rolled back.", e);
            }
        } finally {
            txn.close();
        }
        return status;
    }
}

(可左右滑动)

4.增加FaysonHBaseEventSerializer接口类继承原生的HBaseEventSerializer接口类,增加initialize(Event var1, byte[] var2, String var3)方法用于处理指定rowkey

package com.cloudera.hbase;
import org.apache.flume.Event;
import org.apache.flume.sink.hbase.HbaseEventSerializer;
/**
 * package: com.cloudera.hbase
 * describe: 继承HBaseSink的HbaseEventSerializer接口类,增加initialize(Event var1, byte[] var2, String var3)
 * 用于处理指定rowkey
 * creat_user: Fayson
 * email: htechinfo@163.com
 * creat_date: 2018/6/3
 * creat_time: 下午11:54
 * 公众号:Hadoop实操
 */
public interface FaysonHBaseEventSerializer extends HbaseEventSerializer {
    void initialize(Event var1, byte[] var2, String var3);
}

5.新增FaysonHBaseSinkConstants类,用于定义自定义HBaseSink的常量

package com.cloudera.hbase;
/**
 * package: com.cloudera.hbase
 * describe: 自定义HBaseSink常量类
 * creat_user: Fayson
 * email: htechinfo@163.com
 * creat_date: 2018/6/3
 * creat_time: 下午11:49
 * 公众号:Hadoop实操
 */
public class FaysonHBaseSinkConstants {
    public static final String CONFIG_ROWKEYS = "rowkeys";
}

(可左右滑动)

6.新增JsonHBaseEventSerializer类处理JSON格式数据的序列化类,继承FaysonHBaseEventSerializer类,由于代码太长该类只贴代码片段

public class JsonHBaseEventSerializer implements FaysonHBaseEventSerializer {
    private static final Logger logger = LoggerFactory.getLogger(com.cloudera.hbase.JsonHBaseEventSerializer.class);
    private LinkedHashSet<String> rowkeySet;
    @Override
    public void initialize(Event event, byte[] columnFamily, String rowkeys) {
        this.headers = event.getHeaders();
        this.payload = event.getBody();
        this.cf = columnFamily;
        rowkeySet = new LinkedHashSet<>();
        logger.info("rowkeys:" + rowkeys);
        for(String rowkey : rowkeys.split(",")) {
            rowkeySet.add(rowkey);
        }
    }
    @Override
    public List<Row> getActions() throws FlumeException {
        List<Row> actions = Lists.newArrayList();
        //将JSON消息转换为Map对象
        Map<String, String> resultMap = JsonStr2Map.jsonStr2Map(new String(payload, charset));
        try {
            byte[] rowKey;
            if(!rowkeySet.isEmpty()) {  //如果rowkeySet集合不为空则使用自定义的rowkey
                StringBuffer rowkeyBuff = new StringBuffer();
                for(String rowkey : rowkeySet) {
                    rowkeyBuff.append(resultMap.get(rowkey) + "-");
                }
                rowKey = rowkeyBuff.substring(0, rowkeyBuff.length()-1).getBytes();
                //移除Map中不需要存入Column的列
                for(String rowkey : rowkeySet) {
                    resultMap.remove(rowkey);
                }
            } else {
                if (rowKeyIndex < 0) {
                    rowKey = getRowKey();
                } else {
                    rowKey = resultMap.get(rowKeyIndex + 1).getBytes(Charsets.UTF_8);
                }
            }
            Put put = new Put(rowKey);
            for(Map.Entry<String, String> entry : resultMap.entrySet()) {
                put.add(cf, entry.getKey().getBytes(), entry.getValue().getBytes(Charsets.UTF_8));
            }
            if (depositHeaders) {
                for (Map.Entry<String, String> entry : headers.entrySet()) {
                    put.add(cf, entry.getKey().getBytes(charset), entry.getValue().getBytes(charset));
                }
            }
            actions.add(put);
        } catch (Exception e) {
            throw new FlumeException("Could not get row key!", e);
        }
        return actions;
    }
}

(可左右滑动)

7.将开发好的代码使用mvn命令打包

mvn clean package

将打包好的flume-sink-1.0-SNAPSHOT.jar部署到集群所有节点的/opt/cloudera/parcels/CDH/lib/flume-ng/lib目录下

[root@cdh01 shell]# sh bk_cp.sh node.list /root/flume-sink-1.0-SNAPSHOT.jar /opt/cloudera/parcels/CDH/lib/flume-ng/lib/

(可左右滑动)

4.配置Flume Agent


1.登录CM,进flume服务界面,点击“配置”

2.在Agent类别的“配置文件”中输入如下内容:

kafka.sources  = source1
kafka.channels = channel1
kafka.sinks = sink1
kafka.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
kafka.sources.source1.kafka.bootstrap.servers = cdh01.fayson.com:9092,cdh02.fayson.com:9092,cdh03.fayson.com:9092
kafka.sources.source1.kafka.topics = kafka_sparkstreaming_kudu_topic
kafka.sources.source1.kafka.consumer.group.id = flume-consumer
kafka.sources.source1.channels = channel1
kafka.channels.channel1.type = memory
kafka.channels.channel1.capacity = 10000
kafka.channels.channel1.transactionCapacity = 1000
kafka.sinks.sink1.channel = channel1
kafka.sinks.sink1.type = com.cloudera.hbase.FaysonHBaseSink
kafka.sinks.sink1.table = fayson_ods_deal_daily
kafka.sinks.sink1.columnFamily = info
kafka.sinks.sink1.rowkeys = id,mobile_phone_num
kafka.sinks.sink1.serializer = com.cloudera.hbase.JsonHBaseEventSerializer

(可左右滑动)

3.保存flume配置,并重启Flume服务

5.流程测试


1.进入0283-kafka-shell目录执行命令向Kafka的kafka_sparkstreaming_kudu_topic发送消息

[root@cdh01 0283-kafka-shell]# sh run.sh ods_user_600.txt

2.通过Hue查看HBase的fayson_ods_deal_daily表

可以看到数据已写入到HBase的fayson_ods_deal_daily表,查看表总数与发送Kafka数量一致

6.总结


1.HBaseSink支持的序列化方式比少,所以Fayson自己写了JsonHBaseEventSerializer类用于解析JSON数据。

2.需要将自定义开发的Jar包部署到${ FLUME_HOME} /lib目录下

3.使用原生的Sink无法指定HBase的rowkey,这里Fayson在自己的自定义Sink中增加了对rowkey的指定,同样可以指定多个列为rowkey,JSON数据的key作为HBase的Column。

GitHub地址:

https://github.com/fayson/cdhproject/tree/master/flumesink/src/main/java/com/cloudera/hbase

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2018-06-04

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏编舟记

R3 Corda 和 springboot 集成

因为Corda内置的Corda Webserver已经被标记成弃用了,一般不再提供支持;再者,springboot的生态明显占优。

26620
来自专栏xingoo, 一个梦想做发明家的程序员

如何在Java应用中提交Spark任务?

最近看到有几个Github友关注了Streaming的监控工程——Teddy,所以思来想去还是优化下代码,不能让别人看笑话啊。于是就想改一下之前觉得最丑陋的一...

79760
来自专栏IT笔记

微信公众号H5支付遇到的那些坑

简史 官方文档说的很清楚,商户已有H5商城网站,用户通过消息或扫描二维码在微信内打开网页时,可以调用微信支付完成下单购买的流程。 当然,最近微信支付平台也加入了...

3.1K120
来自专栏LuckQI

SpringBoot~SpringBatch 使用

Spring Batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于大数据量和高性能的批处理任务,Spr...

14930
来自专栏偏前端工程师的驿站

Eclipse魔法堂:任务管理器

一、前言                                  Eclipse的任务管理器为我们提供一个方便的入口查看工程代办事宜,并定位到对应的代...

23180
来自专栏Spark学习技巧

锁机制-java面试

何为同步?JVM规范规定JVM基于进入和退出Monitor对象来实现方法同步和代码块同步,但两者的实现细节不一样。代码块同步是使用monitorenter和mo...

41260
来自专栏公众号_薛勤的博客

Netty入门(二)之PC聊天室

参看Netty入门(一):Netty入门(一)之webSocket聊天室 Netty4.X下载地址:http://netty.io/downloads.ht...

18540
来自专栏Spring相关

第3章—高级装配—配置profile bean

我们正常开发的过程中经常遇到的问题是,开发环境是一套环境,qa测试是一套环境,线上部署又是一套环境。这样从开发到测试再到部署,会对程序中的配置修改多次,尤其是从...

10920
来自专栏日常分享

Java 端口扫描器 TCP的实现方法

想必很多朋友都实现过一个简易的聊天室这个功能,其中涉及到Socket套接字这个类,我们通过一个特定的IP以及特定的端口创建一个服务端的套接字(ServerSoc...

24210
来自专栏Play & Scala 技术分享

PlayScala 2.5.x - 关于Content-Type的注意事项

33440

扫码关注云+社区

领取腾讯云代金券