如何在Kerberos环境下使用Flume采集Kafka数据写入HBase

在前面的文章Fayson也介绍了一些关于Flume的文章《非Kerberos环境下Kafka数据到Flume进Hive表》、《如何使用Flume准实时建立Solr的全文索引》、《如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS》、《如何使用Flume采集Kafka数据写入Kudu》和《如何使用Flume采集Kafka数据写入HBase》。本篇文章Fayson主要介绍在Kerberos的CDH集群中使用Flume采集Kafka数据写入HBase。

  • 内容概述

1.环境准备

2.配置Flume Agent

3.流程测试

4.总结

  • 测试环境

1.CM和CDH版本为5.12.1

2.采用root用户操作

  • 前置条件

1.Flume已安装

2.HBase和Kafka已安装且已启用Kerberos

3.集群已启用Kerberos

2.环境准备


1.编写向Kafka生成数据的ReadUserInfoFileToKafka.java代码,具体内容可以在Fayson的GitHub上查看

脚本目录说明:

conf:该目录下是kafka、krb5.conf及keytab配置文件,如下目录的配置文件不需要修改名称,只需要修改相应的内容即可

jaas.conf文件内容:

[root@cdh01 0285-kafka-shell]# vim conf/jaas.conf 
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/root/0285-kafka-shell/conf/fayson.keytab"
  principal="fayson@FAYSON.COM";
};

(可左右滑动)

fayson.keytab文件生成,登录KDC和Kadmin所在服务器执行如下命令生成keytab文件

[root@cdh01 ~]# kadmin.local 
Authenticating as principal hbase/admin@FAYSON.COM with password.
kadmin.local:  xst -norandkey -k fayson.keytab fayson@FAYSON.COM

(可左右滑动)

使用klist查看导出的keytab文件是否正确

[root@cdh01 ~]# klist -el fayson.keytab

(可左右滑动)

lib:向Kafka生产消息的依赖包, kafka-demo-1.0-SNAPSHOT.jar为自己开发的向Kerberos环境发送消息的示例程序

ods_user_600.txt:测试数据文件,共600条测试数据

run.sh:运行脚本

#!/bin/bash
#########################################
# 创建Topic
# kafka-topics --create --zookeeper cdh01.fayson.com:2181,cdh02.fayson.com:2181,cdh03.fayson.com:2181 --replication-factor 3 --partitions 3 --topic kafka_sparkstreaming_hbase_topic
#
########################################
JAVA_HOME=/usr/java/jdk1.8.0_144
#要读取的文件
read_file=$1
for file in `ls lib/*jar`
do
    CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH
${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.kerberos.ReadUserInfoFileToKafka $read_file

(可左右滑动)

以上脚本根据自己的环境修改相应配置即可,具体脚本可以查看Fayson的GitHub:

https://github.com/fayson/cdhproject/tree/master/kafkademo/0285-kafka-shell

2.通过hbase shell命令创建HBase测试表,并为fayson用户授权该表的读写

create 'fayson_ods_deal_daily','info'
grant 'fayson','RWXCA','fayson_ods_deal_daily'

(可左右滑动)

注:由于HBase启用了Kerberos,所以我们这里在建表的同时需要为该表赋予给fayson用户,启动hbase shell命令需要使用hbase用户进行kinit操作。

3.配置Flume Agent


1.准备Flume使用的jaas.conf文件内容如下:

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/opt/cloudera/parcels/flume-kerberos/fayson.keytab"
  principal="fayson@FAYSON.COM";
};
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/opt/cloudera/parcels/flume-kerberos/fayson.keytab"
  principal="fayson@FAYSON.COM";
};

(可左右滑动)

2.在CDH集群的所有节点的/opt/cloudera/parcels目录下创建flume-kerbers目录

[root@cdh01 shell]# sh ssh_do_all.sh node.list "mkdir -p /opt/cloudera/parcels/flume-kerberos"

(可左右滑动)

将准备好的fayson.keytab和jaas.conf文件拷贝至集群所有节点的/opt/cloudera/parcels/flume-kerberos目录下

注:这里的jaas.conf文件是为Flume准备的配置文件,不要用到生产Kafka消息的文件。

为该目录下的文件授予755权限

[root@cdh03 flume-kerberos]# chown -R flume. *
[root@cdh03 flume-kerberos]# chmod -R 755 *

(可左右滑动)

注:这里flume-kerberos及目录下的文件可以不用集群所有节点均存在,至少要保证Flume服务所在节点存在,目录下的文件权限需要调整否则会出现一些莫名其妙的异常。

3.登录CM,进flume服务界面,点击“配置”

4.在Agent类别的“配置文件”中输入如下内容:

kafka.sources  = source1
kafka.channels = channel1
kafka.sinks = sink1
kafka.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
kafka.sources.source1.kafka.bootstrap.servers = cdh01.fayson.com:9092,cdh02.fayson.com:9092,cdh03.fayson.com:9092
kafka.sources.source1.kafka.topics = kafka_sparkstreaming_hbase_topic
kafka.sources.source1.kafka.consumer.group.id = flume-consumer
kafka.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
kafka.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
kafka.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka

kafka.sources.source1.channels = channel1
kafka.channels.channel1.type = memory
kafka.channels.channel1.capacity = 10000
kafka.channels.channel1.transactionCapacity = 1000
kafka.sinks.sink1.channel = channel1

kafka.sinks.sink1.type = com.cloudera.hbase.FaysonHBaseSink
kafka.sinks.sink1.table = fayson_ods_deal_daily
kafka.sinks.sink1.columnFamily = info
kafka.sinks.sink1.rowkeys = id,mobile_phone_num
kafka.sinks.sink1.serializer = com.cloudera.hbase.JsonHBaseEventSerializer
kafka.sinks.sink1.kerberosPrincipal = fayson@FAYSON.COM
kafka.sinks.sink1.kerberosKeytab = /opt/cloudera/parcels/CDH/lib/flume-ng/kerberos/fayson.keytab

(可左右滑动)

注:配置与Fayson前面讲的非Kerberos环境下有些不一样,增加了Kerberos的配置,这里的HBaseSink还是使用的Fayson自定义的Sink,具体可以参考前一篇文章《如何使用Flume采集Kafka数据写入HBase》

5.修改Flue Agent服务的启动参数

在Flume Agent的Java配置选项中增加如下配置:

-Djava.security.auth.login.config=/opt/cloudera/parcels/flume-kerberos/jaas.conf

(可左右滑动)

6.保存flume配置,并重启Flume服务

4.流程测试


1.进入0285-kafka-shell目录执行命令向Kafka发送消息

[root@cdh01 0285-kafka-shell]# sh run.sh ods_user_600.txt

(可左右滑动)

2.在命令行使用hbase shell查看fayson_ods_deal_daily表

[root@cdh01 ~]# klist
[root@cdh01 ~]# hbase shell
hbase(main):001:0> list
hbase(main):002:0> scan 'fayson_ods_deal_daily'

(可左右滑动)

可以看到数据已写入到HBase的fayson_ods_deal_daily表,查看表总数与发送Kafka数量一致

5.总结


1.由于集群启用了Kerberos,向Kafka生成消息的应用脚本需要做相应的修改,在代码中增加Kerberos认证,具体示例代码这里Fayson未贴出来可以通过文章末尾的GitHub地址查阅。

2. Flume中使用的HBaseSink是Fayson前面一篇文章中将的自定义HBaseSink,可以指定HBase表的rowkey及支持Kerberos认证。

3.在配置Flume访问Kerberos环境的Kafka和HBase时需要为Flume的启动参数中增加jaas.conf指定Kerberos信息。

4.为Flume指定的jaas.conf和keytab文件要确保Flume用于有访问权限,佛则启动Flume时会报错。

5.由于HBase启用了Kerberos,所以我们在使用fayson用户向HBase表中写入数据时要先使用hbase用户启动hbase shell为fayson用于授予fayson_ods_deal_daily表的读写权限。

GitHub地址:

https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/kerberos/ReadUserInfoFileToKafka.java

https://github.com/fayson/cdhproject/tree/master/kafkademo/src/main/resources

自定义HBaseSink示例代码如下:

https://github.com/fayson/cdhproject/tree/master/flumesink/src/main/java/com/cloudera/hbase

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2018-06-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏杂烩

canal安装配置 转

a. canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,建议配置binlog模式为row.

18920
来自专栏电光石火

IDEA无法在service层用注解方式注入mapper

现在项目集成了mybatis,在业务类service层中用@service  @Autowired 把mapper注入,实际运行时执行到该service报错,报...

22960
来自专栏Hadoop实操

如何使用Cloudera Manager为Hadoop服务角色启用远程JMX访问

JMX(Java Management Extensions,即Java管理扩展)做Java开发的人都比较熟悉,它提供了一种在运行时动态资源的监控指标。JMX主...

27230
来自专栏性能与架构

体验 MySQL InnoDB Cluster

Mysql高可用环境的搭建比较麻烦,这使很多人都不去搭建高可用环境,等到有问题时再说 最近Mysql的动作很快,新版本的发布频繁,推出很多新的好用功能及插件,其...

89880
来自专栏小夜博客

使用LNMP常见问题解答

423130
来自专栏大数据-Hadoop、Spark

Hive的使用-Thrift服务

Hive Thrift服务 1.启动Thrift为前台服务: bin/hiveserver2 2.启动为后台服务: nohup bin/hiveserver...

46070
来自专栏微信公众号:Java团长

超详细图解从0搭建SSM框架【intellij idea】

文章链接:http://blog.csdn.net/w8897282/article/details/71215591

1.3K10
来自专栏Web项目聚集地

从零开发一个JavaWeb项目要点「建议收藏」

本文章详细的列出了开发一个传统JavaWeb项目需要注意的要点,从环境准备开始到三层架构搭建,需要注意的地方全部罗列出来。本文作者「张丰哲」欢迎点击阅读原文,关...

10620
来自专栏Hadoop实操

05-如何为Hive集成AD认证

Fayson在前面的文章《01-如何在Window Server 2012 R2搭建Acitve Directory域服务》、《02-Active Direct...

26640
来自专栏运维小白

13.1 设置更改root密码

设置更改root密码目录概要 /usr/local/mysql/bin/mysql -uroot 更改环境变量PATH,增加mysql绝对路径 mysqladm...

36360

扫码关注云+社区

领取腾讯云代金券