如何使用Flume采集Kafka数据写入Kudu

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github: https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


在前面的文章Fayson也介绍了一些关于Flume的文章《非Kerberos环境下Kafka数据到Flume进Hive表》、《如何使用Flume准实时建立Solr的全文索引》和《如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS》,本篇文章Fayson主要介绍在非Kerberos的CDH集群中使用Flume采集Kafka数据写入Kudu。本文的数据流图如下:

  • 内容概述

1.环境准备及开发自定义KudSink

2.配置Flume Agent

3.流程测试

4.总结

  • 测试环境

1.CM和CDH版本为5.12.1

2.使用root用户操作

  • 前置条件

1.Flume已安装

2.Kudu与Impala服务已安装

3.Kudu与Impala已集成

4.集群未启用Kerberos

2.环境准备


1.下载kudu-flume-sink依赖包,地址如下

https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/kudu/kudu-flume-sink/1.4.0-cdh5.12.1/kudu-flume-sink-1.4.0-cdh5.12.1.jar

(可左右滑动)

2.将下载的依赖包部署在集群所有节点的/opt/cloudera/parcels/CDH/lib/flume-ng/lib/目录下

[root@cdh01 shell]# sh bk_cp.sh node.list /root/kudu-flume-sink-1.4.0-cdh5.12.1.jar /opt/cloudera/parcels/CDH/lib/flume-ng/lib/

(可左右滑动)

3.准备向Kafka发送数据的脚本

这里脚本Fayson就不在详细说明了,前面的文章及Github上均有说明:

https://github.com/fayson/cdhproject/tree/master/kafkademo/0283-kafka-shell

(可左右滑动)

4.通过Hue创建Kudu测试表

CREATE TABLE ods_deal_daily_kudu (
  id STRING COMPRESSION snappy,
  name STRING COMPRESSION snappy,
  sex STRING COMPRESSION snappy,
  city STRING COMPRESSION snappy,
  occupation STRING COMPRESSION snappy,
  mobile_phone_num STRING COMPRESSION snappy,
  fix_phone_num STRING COMPRESSION snappy,
  bank_name STRING COMPRESSION snappy,
  address STRING COMPRESSION snappy,
  marriage STRING COMPRESSION snappy,
  child_num INT COMPRESSION snappy,
  PRIMARY KEY (id)
)
  PARTITION BY HASH PARTITIONS 6
STORED AS KUDU;

(可左右滑动)

3.开发KuduSink


在kudu的官网默认支持KuduSink,但KuduSink不是特别灵活,像Fayson的这个示例中,向Kafka发送的是JSON数据,但默认KuduOperationsProducer支持的数据解析有限,支持正则表达式的方式,但对于Fayson来说写正则表达式比较头疼,因此这里写一个自定义的JsonKuduOperationsProducer。

1. 创建一个Maven工程flume-sink

2.添加flume-sink工程依赖

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <avro.version>1.8.1</avro.version>
    <flume.version>1.6.0</flume.version>
    <hadoop.version>2.6.0-cdh5.11.2</hadoop.version>
    <kudu.version>1.4.0-cdh5.12.1</kudu.version>
    <slf4j.version>1.7.12</slf4j.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client</artifactId>
        <version>${kudu.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>${flume.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-configuration</artifactId>
        <version>${flume.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>${avro.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-flume-sink</artifactId>
        <version>1.4.0-cdh5.12.1</version>
    </dependency>
</dependencies>

(可左右滑动)

3.新建JSON字符串解析工具类JsonStr2Map.java,将Json字符串解析为Map对象

package com.cloudera.utils;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
 * package: com.cloudera.utils
 * describe: Json字符串转Map
 * creat_user: Fayson
 * email: htechinfo@163.com
 * creat_date: 2018/6/2
 * creat_time: 下午11:19
 * 公众号:Hadoop实操
 */
public class JsonStr2Map {
    /**
     * 将Json字符串转为Map对象
     * @param jsonStr
     * @return
     */
    public static Map<String, String> jsonStr2Map(String jsonStr) {
        Map<String, String> resultMap = new HashMap<>();
        Pattern pattern = Pattern.compile("(\"\\w+\"):(\"[^\"]+\")");
        Matcher m = pattern.matcher(jsonStr);
        String[] strs = null;
        while (m.find()) {
            strs = m.group().split(":");
            if(strs != null && strs.length == 2) {
                resultMap.put(strs[0].replaceAll("\"", "").trim(), strs[1].trim().replaceAll("\"", ""));
            }
        }
        return resultMap;
    }
}

(可左右滑动)

4.创建JsonKuduOperationsProducer.java用于处理Json字符串写入Kudu

package com.cloudera.kudu;
import com.google.common.collect.Lists;
import com.cloudera.utils.JsonStr2Map;
import com.google.common.base.Preconditions;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.flume.sink.KuduOperationsProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
/**
 * package: com.cloudera.kudu
 * describe: 自定义的KuduSink用于解析JSON格式数据
 * creat_user: Fayson
 * email: htechinfo@163.com
 * creat_date: 2018/6/2
 * creat_time: 下午11:07
 * 公众号:Hadoop实操
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class JsonKuduOperationsProducer implements KuduOperationsProducer {
    private static final Logger logger = LoggerFactory.getLogger(JsonKuduOperationsProducer.class);
    private static final String INSERT = "insert";
    private static final String UPSERT = "upsert";
    private static final List<String> validOperations = Lists.newArrayList(UPSERT, INSERT);
    public static final String ENCODING_PROP = "encoding";
    public static final String DEFAULT_ENCODING = "utf-8";
    public static final String OPERATION_PROP = "operation";
    public static final String DEFAULT_OPERATION = UPSERT;
    public static final String SKIP_MISSING_COLUMN_PROP = "skipMissingColumn";
    public static final boolean DEFAULT_SKIP_MISSING_COLUMN = false;
    public static final String SKIP_BAD_COLUMN_VALUE_PROP = "skipBadColumnValue";
    public static final boolean DEFAULT_SKIP_BAD_COLUMN_VALUE = false;
    public static final String WARN_UNMATCHED_ROWS_PROP = "skipUnmatchedRows";
    public static final boolean DEFAULT_WARN_UNMATCHED_ROWS = true;
    private KuduTable table;
    private Charset charset;
    private String operation;
    private boolean skipMissingColumn;
    private boolean skipBadColumnValue;
    private boolean warnUnmatchedRows;
    public JsonKuduOperationsProducer() {
    }
    @Override
    public void configure(Context context) {
        String charsetName = context.getString(ENCODING_PROP, DEFAULT_ENCODING);
        try {
            charset = Charset.forName(charsetName);
        } catch (IllegalArgumentException e) {
            throw new FlumeException(
                    String.format("Invalid or unsupported charset %s", charsetName), e);
        }
        operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION).toLowerCase();
        Preconditions.checkArgument(
                validOperations.contains(operation),
                "Unrecognized operation '%s'",
                operation);
        skipMissingColumn = context.getBoolean(SKIP_MISSING_COLUMN_PROP,
                DEFAULT_SKIP_MISSING_COLUMN);
        skipBadColumnValue = context.getBoolean(SKIP_BAD_COLUMN_VALUE_PROP,
                DEFAULT_SKIP_BAD_COLUMN_VALUE);
        warnUnmatchedRows = context.getBoolean(WARN_UNMATCHED_ROWS_PROP,
                DEFAULT_WARN_UNMATCHED_ROWS);
    }
    @Override
    public void initialize(KuduTable table) {
        this.table = table;
    }
    @Override
    public List<Operation> getOperations(Event event) throws FlumeException {
        String raw = new String(event.getBody(), charset);
        Map<String, String> rawMap = JsonStr2Map.jsonStr2Map(raw);
        Schema schema = table.getSchema();
        List<Operation> ops = Lists.newArrayList();
        if(raw != null && !raw.isEmpty()) {
            Operation op;
            switch (operation) {
                case UPSERT:
                    op = table.newUpsert();
                    break;
                case INSERT:
                    op = table.newInsert();
                    break;
                default:
                    throw new FlumeException(
                            String.format("Unrecognized operation type '%s' in getOperations(): " +
                                    "this should never happen!", operation));
            }
            PartialRow row = op.getRow();
            for (ColumnSchema col : schema.getColumns()) {
                logger.error("Column:" + col.getName() + "----" + rawMap.get(col.getName()));
                try {
                    coerceAndSet(rawMap.get(col.getName()), col.getName(), col.getType(), row);
                } catch (NumberFormatException e) {
                    String msg = String.format(
                            "Raw value '%s' couldn't be parsed to type %s for column '%s'",
                            raw, col.getType(), col.getName());
                    logOrThrow(skipBadColumnValue, msg, e);
                } catch (IllegalArgumentException e) {
                    String msg = String.format(
                            "Column '%s' has no matching group in '%s'",
                            col.getName(), raw);
                    logOrThrow(skipMissingColumn, msg, e);
                } catch (Exception e) {
                    throw new FlumeException("Failed to create Kudu operation", e);
                }
            }
            ops.add(op);
        }
        return ops;
    }
    /**
     * Coerces the string `rawVal` to the type `type` and sets the resulting
     * value for column `colName` in `row`.
     *
     * @param rawVal the raw string column value
     * @param colName the name of the column
     * @param type the Kudu type to convert `rawVal` to
     * @param row the row to set the value in
     * @throws NumberFormatException if `rawVal` cannot be cast as `type`.
     */
    private void coerceAndSet(String rawVal, String colName, Type type, PartialRow row)
            throws NumberFormatException {
        switch (type) {
            case INT8:
                row.addByte(colName, Byte.parseByte(rawVal));
                break;
            case INT16:
                row.addShort(colName, Short.parseShort(rawVal));
                break;
            case INT32:
                row.addInt(colName, Integer.parseInt(rawVal));
                break;
            case INT64:
                row.addLong(colName, Long.parseLong(rawVal));
                break;
            case BINARY:
                row.addBinary(colName, rawVal.getBytes(charset));
                break;
            case STRING:
                row.addString(colName, rawVal==null?"":rawVal);
                break;
            case BOOL:
                row.addBoolean(colName, Boolean.parseBoolean(rawVal));
                break;
            case FLOAT:
                row.addFloat(colName, Float.parseFloat(rawVal));
                break;
            case DOUBLE:
                row.addDouble(colName, Double.parseDouble(rawVal));
                break;
            case UNIXTIME_MICROS:
                row.addLong(colName, Long.parseLong(rawVal));
                break;
            default:
                logger.warn("got unknown type {} for column '{}'-- ignoring this column", type, colName);
        }
    }
    private void logOrThrow(boolean log, String msg, Exception e)
            throws FlumeException {
        if (log) {
            logger.warn(msg, e);
        } else {
            throw new FlumeException(msg, e);
        }
    }
    @Override
    public void close() {
    }
}

(可左右滑动)

5.将开发好的代码使用mvn命令打包

mvn clean package

将打包好的flume-sink-1.0-SNAPSHOT.jar部署到集群所有节点的/opt/cloudera/parcels/CDH/lib/flume-ng/lib目录下

[root@cdh01 shell]# sh bk_cp.sh node.list /root/flume-sink-1.0-SNAPSHOT.jar /opt/cloudera/parcels/CDH/lib/flume-ng/lib/

(可左右滑动)

4.配置Flume Agent


1.登录CM,进flume服务界面,点击“配置”

2.在Agent类别的“配置文件”中输入如下内容:

kafka.sources  = source1
kafka.channels = channel1
kafka.sinks = sink1
kafka.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
kafka.sources.source1.kafka.bootstrap.servers = cdh01.fayson.com:9092,cdh02.fayson.com:9092,cdh03.fayson.com:9092
kafka.sources.source1.kafka.topics = kafka_sparkstreaming_kudu_topic
kafka.sources.source1.kafka.consumer.group.id = flume-consumer
kafka.sources.source1.channels = channel1
kafka.channels.channel1.type = memory
kafka.channels.channel1.capacity = 10000
kafka.channels.channel1.transactionCapacity = 1000
kafka.sinks.sink1.type = org.apache.kudu.flume.sink.KuduSink
kafka.sinks.sink1.masterAddresses = cdh01.fayson.com,cdh02.fayson.com,cdh03.fayson.com
kafka.sinks.sink1.tableName = impala::default.ods_deal_daily_kudu
kafka.sinks.sink1.channel = channel1
kafka.sinks.sink1.batchSize = 50
kafka.sinks.sink1.producer = com.cloudera.kudu.JsonKuduOperationsProducer

(可左右滑动)

3.保存flume配置,并重启Flume服务

5.流程测试


1.进入0283-kafka-shell目录执行命令向Kafka的kafka_sparkstreaming_kudu_topic发送消息

[root@cdh01 0283-kafka-shell]# sh run.sh ods_user_600.txt

2.通过Hue查看ods_deal_daily_kudu表

可以看到数据已写入到Kudu表,查看表总数与发送Kafka数量一致

6.总结


1.Flume默认没有KuduSink的依赖包,需要将kudu-flume-sink-1.4.0-cdh5.12.1.jar包添加到集群所有节点的/opt/cloudera/parcels/CDH/lib/flume-ng/lib目录下。

2.在KuduSink支持的数据解析方式比少,所以Fayson自己写了JsonKuduOperationsProducer类用于解析JSON数据。

3.需要将自定义开发的Jar包部署到${ FLUME_HOME} /lib目录下。

4.注意在指定KuduSink的tableName时,如果Kudu表是通过impala创建的则需要在表名前添加impala::,如果是通过Kudu API创建则不需要添加。

GitHub地址:

https://github.com/fayson/cdhproject/blob/master/flumesink/src/main/java/com/cloudera/kudu/JsonKuduOperationsProducer.java

https://github.com/fayson/cdhproject/blob/master/flumesink/src/main/java/com/cloudera/utils/JsonStr2Map.java

https://github.com/fayson/cdhproject/blob/master/flumesink/pom.xml

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2018-06-03

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Jed的技术阶梯

利用Sqoop实现Hive的数据与MySQL数据的互导

注意: 在sqoop-1.4.6以前,从MySQL中导出数据到hive表中,不能指定文件格式为parquet,只能先导入到HDFS,在从HDFS上load p...

1K2
来自专栏解Bug之路

MySql之自动生成CRUD代码

MyBatis能够通过获取MySql中的information_schema从而获取表的字段等信息,最后通过这些信息生成代码。 笔者受此启发,将MyBatis...

1333
来自专栏.NET开发者社区

一步一步创建ASP.NET MVC5程序[Repository+Autofac+Automapper+SqlSugar](二)

在本系列第一篇《一步一步创建ASP.NET MVC5程序[Repository+Autofac+Automapper+SqlSugar](一)》中,我为大家介绍...

30410
来自专栏后台开发+音视频+ffmpeg

dpvs源码分析

dpvs是爱奇艺开源的,它是一款基于dpdk的高性能4层负载均衡器。源自于LVS和改版后的alibaba/LVS. dpvs即dpdk-lvs. 等多关于dpv...

1.3K2
来自专栏云计算与大数据

研发:jenkins 持续集成如何打tag

uild Timestamp Plugin will be the Best Answer to get the TIMESTAMPS in the Build...

2074
来自专栏一个会写诗的程序员的博客

6.3 Spring Boot集成mongodb开发小结

本章我们通过SpringBoot集成mongodb,Java,Kotlin开发一个极简社区文章博客系统。

2103
来自专栏芋道源码1024

MyBastis 三种批量插入方式的性能比较

数据库使用的是sqlserver,JDK版本1.8,运行在SpringBoot环境下

4773
来自专栏Spark学习技巧

重要 | mr使用hcatalog读写hive表

企业中,由于领导们的要求,hive中有数据存储格式很多时候是会变的,比如为了优化将tsv,csv格式改为了parquet或者orcfile。那么这个时候假如是m...

2122
来自专栏二进制文集

Struts1 增删改查

本篇文章介绍如何使用Struts1进行增删改查,仅是对自己学习的记录,并没有深入分析每个模块(不重复造轮子O(∩_∩)O~)。

4443
来自专栏程序员的酒和故事

Go实战--golang中使用RethinkDB(gorethink/gorethink.v3)

生命不止,继续go go go !!! 关于golang中操作数据库,曾经介绍了不少: Go实战–go语言操作sqlite数据库(The way to go) ...

5828

扫码关注云+社区

领取腾讯云代金券