前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何使用StreamSets实时采集Kafka并入库Kudu

如何使用StreamSets实时采集Kafka并入库Kudu

作者头像
Fayson
发布2018-07-12 15:07:33
2.7K0
发布2018-07-12 15:07:33
举报
文章被收录于专栏:Hadoop实操

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github:https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


Fayson在前面的文章《如何使用StreamSets实现MySQL中变化数据实时写入Kudu》,本篇文章主要介绍如何使用StreamSets实时采集Kafka的数据并将采集的数据写入Kudu。

  • 内容概述

1.测试环境准备

2.准备生产Kafka数据脚本

3.配置StreamSets

4.流程测试及数据验证

  • 测试环境

1.RedHat7.4

2.CM和CDH版本为cdh5.13.3

3.kafka3.0.0(0.11.0)

4.Kudu 1.5.0

  • 前置条件

1.集群已安装Kafka并正常运行

2.集群未启用Kerberos

2.测试环境准备


1.通过如下命令创建测试topic

代码语言:javascript
复制
kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic kafka2kudu_topic

(可左右滑动)

2.通过Hue使用Impala创建一个Kudu表,创建脚本如下:

代码语言:javascript
复制
CREATE TABLE ods_deal_daily_kudu (
  id STRING COMPRESSION snappy,
  name STRING COMPRESSION snappy,
  sex STRING COMPRESSION snappy,
  city STRING COMPRESSION snappy,
  occupation STRING COMPRESSION snappy,
  mobile_phone_num STRING COMPRESSION snappy,
  fix_phone_num STRING COMPRESSION snappy,
  bank_name STRING COMPRESSION snappy,
  address STRING COMPRESSION snappy,
  marriage STRING COMPRESSION snappy,
  child_num INT COMPRESSION snappy,
  PRIMARY KEY (id)
)
  PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
  TBLPROPERTIES ('kudu.master_addresses'='master.gzyh.com'
);

(可左右滑动)

这里在创建Kudu表的时候增加了kudu.master的配置,如果在Impala中未启用集成kudu的配置则需要增加该参数,在Impala中配置向如下:

3..准备测试数据文件

共600条测试数据,数据的id是唯一的。

3.生产Kafka消息


在这里Fayson读取的是本地的数据文件,将每行文件解析并封装为json数据,实时的发送给Kafka。

1.创建Maven工程,工程的pom.xml文件内容如下:

代码语言:javascript
复制
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cdh-project</artifactId>
        <groupId>com.cloudera</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>kafka-demo</artifactId>
    <packaging>jar</packaging>
    <name>kafka-demo</name>
    <url>http://maven.apache.org</url>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.0</version>
        </dependency>
        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.4</version>
            <classifier>jdk15</classifier>
        </dependency>
    </dependencies>
</project>

(可左右滑动)

2.编写ReadFileToKafka.java文件内容如下:

代码语言:javascript
复制
package com.cloudera.nokerberos;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
 * package: com.cloudera.nokerberos
 * describe: 通过读取本地text文件将文件内容解析并组装为JSON发送到Kafka
 * creat_user: Fayson
 * email: htechinfo@163.com
 * creat_date: 2018/4/27
 * creat_time: 下午4:42
 * 公众号:Hadoop实操
 */
public class ReadFileToKafka {
    public static String confPath = System.getProperty("user.dir") + File.separator + "conf";
    public static void main(String[] args) {
        if(args.length < 1) {
            System.out.print("缺少输入参数,请指定要处理的text文件");
            System.exit(1);
        }
        String filePath = args[0];
        BufferedReader reader = null;
        try {
            Properties appProperties = new Properties();
            appProperties.load(new FileInputStream(new File(confPath + File.separator + "app.properties")));
            String brokerlist = String.valueOf(appProperties.get("bootstrap.servers"));
            String topic_name = String.valueOf(appProperties.get("topic.name"));
            Properties props = getKafkaProps();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerlist);
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            reader = new BufferedReader(new FileReader(filePath));
            String tempString = null;
            int line = 1;
            // 一次读入一行,直到读入null为文件结束
            while ((tempString = reader.readLine()) != null) {
                String detailJson = parseJSON(tempString);
                ProducerRecord record = new ProducerRecord<String, String>(topic_name, detailJson);
                producer.send(record);
                line++;
            }
            reader.close();
            producer.flush();
            producer.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e1) {
                }
            }
        }
    }
    /**
     * 将txt文件中的每行数据解析并组装为json字符串
     * @param tempString
     * @return
     */
    private static String parseJSON(String tempString) {
        if(tempString != null && tempString.length() > 0) {
            Map<String, String> resultMap = null;
            String[] detail = tempString.split("\001");
            resultMap = new HashMap<>();
            resultMap.put("id", detail[0]);
            resultMap.put("name", detail[1]);
            resultMap.put("sex", detail[2]);
            resultMap.put("city", detail[3]);
            resultMap.put("occupation", detail[4]);
            resultMap.put("mobile_phone_num", detail[5]);
            resultMap.put("fix_phone_num", detail[6]);
            resultMap.put("bank_name", detail[7]);
            resultMap.put("address", detail[8]);
            resultMap.put("marriage", detail[9]);
            resultMap.put("child_num", detail[10]);
            return JSONObject.fromObject(resultMap).toString();
        }
        return null;
    }
    /**
     * 初始化Kafka配置
     * @return
     */
    private static Properties getKafkaProps() {
        try{
            Properties props = new Properties();
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000); //批量发送消息
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            return props;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

(可左右滑动)

3.将编写好的代码使用mvn命令打包

在工程目录使用mvn cleanpackage命令进行编译打包

4.编写脚本run.sh脚本运行jar包

运行脚本目录结构如下

run.sh脚本内容如下

代码语言:javascript
复制
[root@master kafka-run]# vim run.sh 
#!/bin/bash
#########################################
# 创建Topic
# kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic ods_deal_daily_topic
########################################
JAVA_HOME=/usr/java/jdk1.8.0_131
#要读取的文件
read_file=$1
for file in `ls lib/*jar`
do
    CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH
${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.nokerberos.ReadFileToKafka $read_file

(可左右滑动)

conf目录下的配置文件app.properties内容如下

代码语言:javascript
复制
[root@master kafka-run]# vim conf/app.properties 
bootstrap.servers=cdh01.gzyh.com:9092,cdh02.gzyh.com:9092,cdh03.gzyh.com:9092
topic.name=ods_deal_daily_topic

(可左右滑动)

lib目录的依赖包

依赖包可以在命令行使用mvn命令导出:

代码语言:javascript
复制
mvn dependency:copy-dependencies -DoutputDirectory=/tmp/lib

(可左右滑动)

数据文件内容:

4.在StreamSets上创建Pipline


1.登录StreamSets,创建一个kafka2kudu的Pipline

2.在Pipline流程中添加Kafka Consumer作为源并配置Kafka基础信息

3.配置Kafka相关信息,如Broker、ZK及Topic

4.配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON

5.添加Kudu模块及配置基本信息

6.配置Kudu的Master、Table、Operation等

Kudu Masters:可以配置多个,多个地址以“,”分割

Table Name:如果使用Impala创建的Kudu表则需要添加impala::前缀

Field to ColumnMapping:配置Json中key与Kudu表的column的映射关系,如果字段名称一致则不需要配置。

DefaultOpertation:设置操作类型如:insert、upsert、delete

Kudu模块高级配置使用默认配置

5.流程测试验证


1.启动kafka2kudu的Pipline,启动成功如下图显示

2.在命令行运行run.sh脚本向Kafka发送消息

代码语言:javascript
复制
[root@master kafka-run]# sh run.sh ods_user_600.txt 

(可左右滑动)

上面执行了两次脚本。

3.在命令行运行run.sh脚本向Kafka发送消息

点击Kudu模块,查看监控信息

4.查看Kudu的ods_deal_daily_kudu表内容

入库的数据总条数

可以看到ods_deal_daily_kudu表的总条数与准备的测试数据量一致。

GitHub地址:

https://github.com/fayson/cdhproject/tree/master/kafkademo/readlocalfile-kafka-shell

https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/nokerberos/ReadFileToKafka.java

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-05-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Hadoop实操 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档