温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
Fayson的github:https://github.com/fayson/cdhproject
提示:代码块部分可以左右滑动查看噢
1.文档编写目的
Fayson在前面的文章《如何使用StreamSets实现MySQL中变化数据实时写入Kudu》,本篇文章主要介绍如何使用StreamSets实时采集Kafka的数据并将采集的数据写入Kudu。
1.测试环境准备
2.准备生产Kafka数据脚本
3.配置StreamSets
4.流程测试及数据验证
1.RedHat7.4
2.CM和CDH版本为cdh5.13.3
3.kafka3.0.0(0.11.0)
4.Kudu 1.5.0
1.集群已安装Kafka并正常运行
2.集群未启用Kerberos
2.测试环境准备
1.通过如下命令创建测试topic
kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic kafka2kudu_topic
(可左右滑动)
2.通过Hue使用Impala创建一个Kudu表,创建脚本如下:
CREATE TABLE ods_deal_daily_kudu (
id STRING COMPRESSION snappy,
name STRING COMPRESSION snappy,
sex STRING COMPRESSION snappy,
city STRING COMPRESSION snappy,
occupation STRING COMPRESSION snappy,
mobile_phone_num STRING COMPRESSION snappy,
fix_phone_num STRING COMPRESSION snappy,
bank_name STRING COMPRESSION snappy,
address STRING COMPRESSION snappy,
marriage STRING COMPRESSION snappy,
child_num INT COMPRESSION snappy,
PRIMARY KEY (id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='master.gzyh.com'
);
(可左右滑动)
这里在创建Kudu表的时候增加了kudu.master的配置,如果在Impala中未启用集成kudu的配置则需要增加该参数,在Impala中配置向如下:
3..准备测试数据文件
共600条测试数据,数据的id是唯一的。
3.生产Kafka消息
在这里Fayson读取的是本地的数据文件,将每行文件解析并封装为json数据,实时的发送给Kafka。
1.创建Maven工程,工程的pom.xml文件内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cdh-project</artifactId>
<groupId>com.cloudera</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-demo</artifactId>
<packaging>jar</packaging>
<name>kafka-demo</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
</dependencies>
</project>
(可左右滑动)
2.编写ReadFileToKafka.java文件内容如下:
package com.cloudera.nokerberos;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* package: com.cloudera.nokerberos
* describe: 通过读取本地text文件将文件内容解析并组装为JSON发送到Kafka
* creat_user: Fayson
* email: htechinfo@163.com
* creat_date: 2018/4/27
* creat_time: 下午4:42
* 公众号:Hadoop实操
*/
public class ReadFileToKafka {
public static String confPath = System.getProperty("user.dir") + File.separator + "conf";
public static void main(String[] args) {
if(args.length < 1) {
System.out.print("缺少输入参数,请指定要处理的text文件");
System.exit(1);
}
String filePath = args[0];
BufferedReader reader = null;
try {
Properties appProperties = new Properties();
appProperties.load(new FileInputStream(new File(confPath + File.separator + "app.properties")));
String brokerlist = String.valueOf(appProperties.get("bootstrap.servers"));
String topic_name = String.valueOf(appProperties.get("topic.name"));
Properties props = getKafkaProps();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerlist);
Producer<String, String> producer = new KafkaProducer<String, String>(props);
reader = new BufferedReader(new FileReader(filePath));
String tempString = null;
int line = 1;
// 一次读入一行,直到读入null为文件结束
while ((tempString = reader.readLine()) != null) {
String detailJson = parseJSON(tempString);
ProducerRecord record = new ProducerRecord<String, String>(topic_name, detailJson);
producer.send(record);
line++;
}
reader.close();
producer.flush();
producer.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
}
}
}
}
/**
* 将txt文件中的每行数据解析并组装为json字符串
* @param tempString
* @return
*/
private static String parseJSON(String tempString) {
if(tempString != null && tempString.length() > 0) {
Map<String, String> resultMap = null;
String[] detail = tempString.split("\001");
resultMap = new HashMap<>();
resultMap.put("id", detail[0]);
resultMap.put("name", detail[1]);
resultMap.put("sex", detail[2]);
resultMap.put("city", detail[3]);
resultMap.put("occupation", detail[4]);
resultMap.put("mobile_phone_num", detail[5]);
resultMap.put("fix_phone_num", detail[6]);
resultMap.put("bank_name", detail[7]);
resultMap.put("address", detail[8]);
resultMap.put("marriage", detail[9]);
resultMap.put("child_num", detail[10]);
return JSONObject.fromObject(resultMap).toString();
}
return null;
}
/**
* 初始化Kafka配置
* @return
*/
private static Properties getKafkaProps() {
try{
Properties props = new Properties();
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000); //批量发送消息
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return props;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
(可左右滑动)
3.将编写好的代码使用mvn命令打包
在工程目录使用mvn cleanpackage命令进行编译打包
4.编写脚本run.sh脚本运行jar包
运行脚本目录结构如下
run.sh脚本内容如下
[root@master kafka-run]# vim run.sh
#!/bin/bash
#########################################
# 创建Topic
# kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic ods_deal_daily_topic
########################################
JAVA_HOME=/usr/java/jdk1.8.0_131
#要读取的文件
read_file=$1
for file in `ls lib/*jar`
do
CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH
${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.nokerberos.ReadFileToKafka $read_file
(可左右滑动)
conf目录下的配置文件app.properties内容如下
[root@master kafka-run]# vim conf/app.properties
bootstrap.servers=cdh01.gzyh.com:9092,cdh02.gzyh.com:9092,cdh03.gzyh.com:9092
topic.name=ods_deal_daily_topic
(可左右滑动)
lib目录的依赖包
依赖包可以在命令行使用mvn命令导出:
mvn dependency:copy-dependencies -DoutputDirectory=/tmp/lib
(可左右滑动)
数据文件内容:
4.在StreamSets上创建Pipline
1.登录StreamSets,创建一个kafka2kudu的Pipline
2.在Pipline流程中添加Kafka Consumer作为源并配置Kafka基础信息
3.配置Kafka相关信息,如Broker、ZK及Topic
4.配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON
5.添加Kudu模块及配置基本信息
6.配置Kudu的Master、Table、Operation等
Kudu Masters:可以配置多个,多个地址以“,”分割
Table Name:如果使用Impala创建的Kudu表则需要添加impala::前缀
Field to ColumnMapping:配置Json中key与Kudu表的column的映射关系,如果字段名称一致则不需要配置。
DefaultOpertation:设置操作类型如:insert、upsert、delete
Kudu模块高级配置使用默认配置
5.流程测试验证
1.启动kafka2kudu的Pipline,启动成功如下图显示
2.在命令行运行run.sh脚本向Kafka发送消息
[root@master kafka-run]# sh run.sh ods_user_600.txt
(可左右滑动)
上面执行了两次脚本。
3.在命令行运行run.sh脚本向Kafka发送消息
点击Kudu模块,查看监控信息
4.查看Kudu的ods_deal_daily_kudu表内容
入库的数据总条数
可以看到ods_deal_daily_kudu表的总条数与准备的测试数据量一致。
GitHub地址:
https://github.com/fayson/cdhproject/tree/master/kafkademo/readlocalfile-kafka-shell
https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/nokerberos/ReadFileToKafka.java
提示:代码块部分可以左右滑动查看噢
为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。
原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操