如何使用StreamSets实时采集Kafka并入库Kudu

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github:https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


Fayson在前面的文章《如何使用StreamSets实现MySQL中变化数据实时写入Kudu》,本篇文章主要介绍如何使用StreamSets实时采集Kafka的数据并将采集的数据写入Kudu。

  • 内容概述

1.测试环境准备

2.准备生产Kafka数据脚本

3.配置StreamSets

4.流程测试及数据验证

  • 测试环境

1.RedHat7.4

2.CM和CDH版本为cdh5.13.3

3.kafka3.0.0(0.11.0)

4.Kudu 1.5.0

  • 前置条件

1.集群已安装Kafka并正常运行

2.集群未启用Kerberos

2.测试环境准备


1.通过如下命令创建测试topic

kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic kafka2kudu_topic

(可左右滑动)

2.通过Hue使用Impala创建一个Kudu表,创建脚本如下:

CREATE TABLE ods_deal_daily_kudu (
  id STRING COMPRESSION snappy,
  name STRING COMPRESSION snappy,
  sex STRING COMPRESSION snappy,
  city STRING COMPRESSION snappy,
  occupation STRING COMPRESSION snappy,
  mobile_phone_num STRING COMPRESSION snappy,
  fix_phone_num STRING COMPRESSION snappy,
  bank_name STRING COMPRESSION snappy,
  address STRING COMPRESSION snappy,
  marriage STRING COMPRESSION snappy,
  child_num INT COMPRESSION snappy,
  PRIMARY KEY (id)
)
  PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
  TBLPROPERTIES ('kudu.master_addresses'='master.gzyh.com'
);

(可左右滑动)

这里在创建Kudu表的时候增加了kudu.master的配置,如果在Impala中未启用集成kudu的配置则需要增加该参数,在Impala中配置向如下:

3..准备测试数据文件

共600条测试数据,数据的id是唯一的。

3.生产Kafka消息


在这里Fayson读取的是本地的数据文件,将每行文件解析并封装为json数据,实时的发送给Kafka。

1.创建Maven工程,工程的pom.xml文件内容如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cdh-project</artifactId>
        <groupId>com.cloudera</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>kafka-demo</artifactId>
    <packaging>jar</packaging>
    <name>kafka-demo</name>
    <url>http://maven.apache.org</url>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.0</version>
        </dependency>
        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.4</version>
            <classifier>jdk15</classifier>
        </dependency>
    </dependencies>
</project>

(可左右滑动)

2.编写ReadFileToKafka.java文件内容如下:

package com.cloudera.nokerberos;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
 * package: com.cloudera.nokerberos
 * describe: 通过读取本地text文件将文件内容解析并组装为JSON发送到Kafka
 * creat_user: Fayson
 * email: htechinfo@163.com
 * creat_date: 2018/4/27
 * creat_time: 下午4:42
 * 公众号:Hadoop实操
 */
public class ReadFileToKafka {
    public static String confPath = System.getProperty("user.dir") + File.separator + "conf";
    public static void main(String[] args) {
        if(args.length < 1) {
            System.out.print("缺少输入参数,请指定要处理的text文件");
            System.exit(1);
        }
        String filePath = args[0];
        BufferedReader reader = null;
        try {
            Properties appProperties = new Properties();
            appProperties.load(new FileInputStream(new File(confPath + File.separator + "app.properties")));
            String brokerlist = String.valueOf(appProperties.get("bootstrap.servers"));
            String topic_name = String.valueOf(appProperties.get("topic.name"));
            Properties props = getKafkaProps();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerlist);
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            reader = new BufferedReader(new FileReader(filePath));
            String tempString = null;
            int line = 1;
            // 一次读入一行,直到读入null为文件结束
            while ((tempString = reader.readLine()) != null) {
                String detailJson = parseJSON(tempString);
                ProducerRecord record = new ProducerRecord<String, String>(topic_name, detailJson);
                producer.send(record);
                line++;
            }
            reader.close();
            producer.flush();
            producer.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e1) {
                }
            }
        }
    }
    /**
     * 将txt文件中的每行数据解析并组装为json字符串
     * @param tempString
     * @return
     */
    private static String parseJSON(String tempString) {
        if(tempString != null && tempString.length() > 0) {
            Map<String, String> resultMap = null;
            String[] detail = tempString.split("\001");
            resultMap = new HashMap<>();
            resultMap.put("id", detail[0]);
            resultMap.put("name", detail[1]);
            resultMap.put("sex", detail[2]);
            resultMap.put("city", detail[3]);
            resultMap.put("occupation", detail[4]);
            resultMap.put("mobile_phone_num", detail[5]);
            resultMap.put("fix_phone_num", detail[6]);
            resultMap.put("bank_name", detail[7]);
            resultMap.put("address", detail[8]);
            resultMap.put("marriage", detail[9]);
            resultMap.put("child_num", detail[10]);
            return JSONObject.fromObject(resultMap).toString();
        }
        return null;
    }
    /**
     * 初始化Kafka配置
     * @return
     */
    private static Properties getKafkaProps() {
        try{
            Properties props = new Properties();
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000); //批量发送消息
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            return props;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

(可左右滑动)

3.将编写好的代码使用mvn命令打包

在工程目录使用mvn cleanpackage命令进行编译打包

4.编写脚本run.sh脚本运行jar包

运行脚本目录结构如下

run.sh脚本内容如下

[root@master kafka-run]# vim run.sh 
#!/bin/bash
#########################################
# 创建Topic
# kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic ods_deal_daily_topic
########################################
JAVA_HOME=/usr/java/jdk1.8.0_131
#要读取的文件
read_file=$1
for file in `ls lib/*jar`
do
    CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH
${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.nokerberos.ReadFileToKafka $read_file

(可左右滑动)

conf目录下的配置文件app.properties内容如下

[root@master kafka-run]# vim conf/app.properties 
bootstrap.servers=cdh01.gzyh.com:9092,cdh02.gzyh.com:9092,cdh03.gzyh.com:9092
topic.name=ods_deal_daily_topic

(可左右滑动)

lib目录的依赖包

依赖包可以在命令行使用mvn命令导出:

mvn dependency:copy-dependencies -DoutputDirectory=/tmp/lib

(可左右滑动)

数据文件内容:

4.在StreamSets上创建Pipline


1.登录StreamSets,创建一个kafka2kudu的Pipline

2.在Pipline流程中添加Kafka Consumer作为源并配置Kafka基础信息

3.配置Kafka相关信息,如Broker、ZK及Topic

4.配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON

5.添加Kudu模块及配置基本信息

6.配置Kudu的Master、Table、Operation等

Kudu Masters:可以配置多个,多个地址以“,”分割

Table Name:如果使用Impala创建的Kudu表则需要添加impala::前缀

Field to ColumnMapping:配置Json中key与Kudu表的column的映射关系,如果字段名称一致则不需要配置。

DefaultOpertation:设置操作类型如:insert、upsert、delete

Kudu模块高级配置使用默认配置

5.流程测试验证


1.启动kafka2kudu的Pipline,启动成功如下图显示

2.在命令行运行run.sh脚本向Kafka发送消息

[root@master kafka-run]# sh run.sh ods_user_600.txt 

(可左右滑动)

上面执行了两次脚本。

3.在命令行运行run.sh脚本向Kafka发送消息

点击Kudu模块,查看监控信息

4.查看Kudu的ods_deal_daily_kudu表内容

入库的数据总条数

可以看到ods_deal_daily_kudu表的总条数与准备的测试数据量一致。

GitHub地址:

https://github.com/fayson/cdhproject/tree/master/kafkademo/readlocalfile-kafka-shell

https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/nokerberos/ReadFileToKafka.java

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2018-05-01

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏pydata

hadoop 2.4.1 上安装spark 1.1.0

进入到http://localhost:port访问Ipython Notebook

1202
来自专栏图像识别与深度学习

Bluetooth4_3运行流程(连接发射器SN00000009)

3076
来自专栏about云

spark1.x升级spark2如何升级及需要考虑的问题

问题导读 1.spark2升级哪些内容变化? 2.升级中spark哪些没有发生变化? 3.cloudera中,spark1和spark2能否并存? 4.升级后...

1.2K4
来自专栏JackeyGao的博客

Django小技巧20: 使用多个settings模块

通常来说, 为了保持项目的配置简单,我们会避免使用多个配置文件。但理想很丰满, 现实是随着项目越来越大, settings.py可能也会变得相当复杂. 在那种情...

6141
来自专栏日暮星辰

phpmyadmin与php.ini中的socket配置问题

昨天在安装完新的perconesql数据后,发现phpmyadmin不能正常连接了。一时查来查去不知出了什么问题。

1314
来自专栏大内老A

Windows安全认证是如何进行的?[Kerberos篇]

最近一段时间都在折腾安全(Security)方面的东西,比如Windows认证、非对称加密、数字证书、数字签名、TLS/SSL、WS-Security等。如果时...

2357
来自专栏分布式系统进阶

ReplicaManager源码解析2-LeaderAndIsr 请求响应

其中最主要的操作调用ReplicaManager.becomeLeaderOrFollower来初始化Partition

701
来自专栏挖坑填坑

使用.net core ABP和Angular模板构建博客管理系统(实现博客列表页面)

1991
来自专栏Spring相关

feignClient中修改ribbon的配置

在使用@FeignClient注解的时候 是默认使用了ribbon进行客户端的负载均衡的,默认的是随机的策略,那么如果我们想要更改策略的话,需要修改消费者yml...

4081
来自专栏FreeBuf

远程RPC溢出EXP编写实战之MS06-040

0x01 前言 MS06-040算是个比较老的洞了,在当年影响十分之广,基本上Microsoft大部分操作系统都受到了影响,威力不亚于17年爆出的”永恒之蓝”漏...

30510

扫码关注云+社区

领取腾讯云代金券