如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

注意:Fayson的github调整为:https://github.com/fayson/cdhproject,本文的代码在github中也能找到。

1.文档编写目的


在Kafka集群实际应用中,Kafka的消费者有很多种(如:应用程序、Flume、Spark Streaming、Storm等),本篇文章主要讲述如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS。关于Flume更多sink方式实现敬请关注Fayson后续的文章。本文的数据流图如下:

  • 内容概述

1.Kafka集群启用Kerberos

2.环境准备及配置Flume Agent

3.java访问并测试

  • 测试环境

1.CM和CDH版本为5.11.2

2.采用root用户操作

  • 前置条件

1.集群已启用Kerberos

2.集群已安装Kafka

3.集群已安装Flume

2.Kafka集群启用Kerberos


登录Cloudera Manager进入Kafka服务,修改如下配置Kerberos.auth.enable和security.inter.broker.protocol配置为如下截图:

保存配置并重启Kafka服务。

3.环境准备


由于Kafka集群已启用Kerberos认证,这里需要准备访问Kafka集群的环境,如Keytab、jaas.conf配置等

1.生成访问Kafka集群的keytab文件,在Kerberos所在服务上执行如下命令

[root@ip-172-31-22-86 kafkatest]# pwd
/home/ec2-user/kafkatest
[root@ip-172-31-22-86 kafkatest]# kadmin.local
Authenticating as principal hdfs/admin@CLOUDERA.COM with password.
kadmin.local:  xst -norandkey -k fayson.keytab fayson@CLOUDERA.COM

可以看到在当前目录下生成了fayson@CLOUDERA.COM账号的keytab文件。

2.创建jaas.conf文件,文件内容如下

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/keytab/fayson.keytab"
   principal="fayson@CLOUDERA.COM";
};

3.将keytab文件和jaas.conf文件拷贝至所有Flume Agent运行的节点

这里我们将上面的配置文件拷贝放在Flume Agent节点的/flume-keytab目录下

4.修改目录文件属主,确保flume用户有权限访问

[ec2-user@ip-172-31-21-45 flume-keytab]$ sudo chown -R flume. /flume-keytab/
[ec2-user@ip-172-31-21-45 flume-keytab]$ sudo chmod -R 755 /flume-keytab/

4.配置Flume Agent


1.配置Flume Agent读取Kafka数据写入HDFS

kafka.channels = c1
kafka.sources = s1
kafka.sinks = k1

kafka.sources.s1.type =org.apache.flume.source.kafka.KafkaSource
kafka.sources.s1.kafka.bootstrap.servers =ip-172-31-26-80.ap-southeast-1.compute.internal:9092,ip-172-31-21-45.ap-southeast-1.compute.internal:9092,  ip-172-31-26-102.ap-southeast-1.compute.internal:9092
kafka.sources.s1.kafka.topics = test4
kafka.sources.s1.kafka.consumer.group.id =flume-consumer
kafka.sources.s1.kafka.consumer.security.protocol= SASL_PLAINTEXT
kafka.sources.s1.kafka.consumer.sasl.mechanism= GSSAPI
kafka.sources.s1.kafka.consumer.sasl.kerberos.service.name= kafka
kafka.sources.s1.channels = c1

kafka.channels.c1.type = memory

kafka.sinks.k1.type = hdfs
kafka.sinks.k1.channel = c1
kafka.sinks.k1.hdfs.kerberosKeytab= /flume-keytab/fayson.keytab
kafka.sinks.k1.hdfs.kerberosPrincipal= fayson@CLOUDERA.COM
kafka.sinks.k1.hdfs.path =/tmp/kafka-test
kafka.sinks.k1.hdfs.filePrefix = events-
kafka.sinks.k1.hdfs.writeFormat = Text

关于HDFS Sink的更多配置可以参考:http://flume.apache.org/FlumeUserGuide.html#hdfs-sink

2.增加Flume Agent启动参数

-Djava.security.auth.login.config=/flume-keytab/jaas.conf

配置完成后保存更改并重启FlumeAgent服务。

5.Java生产消息


1.编写jaas.conf文件

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/ec2-user/run-kafka/conf/fayson.keytab"
  principal="fayson@CLOUDERA.COM";
};

2.使用Java编写消息生产代码

package com.cloudera;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.File;
import java.util.Properties;

/**
 * package: com.cloudera
 * describe: TODO
 * creat_user: Fayson
 * email: htechinfo@163.com
 * creat_date: 2017/12/12
 * creat_time: 下午3:35
 * 公众号:Hadoop实操
 */
public class ProducerTest {

    public static String TOPIC_NAME = "test4";

    public static String confPath = System.getProperty("user.dir") + File.separator + "conf";

    public static void main(String[] args) {
        try {
            String krb5conf = confPath + File.separator + "krb5.conf";
            String jaasconf = confPath + File.separator + "jaas.conf";

            System.setProperty("java.security.krb5.conf", krb5conf);
            System.setProperty("java.security.auth.login.config", jaasconf);
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
//            System.setProperty("sun.security.krb5.debug", "true"); //Kerberos Debug模式

            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.kerberos.service.name", "kafka");

            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            for (int i = 0; i < 10; i++) {
                String message = i + "\t" + "fayson" + i  + "\t" + 22+i;
                ProducerRecord record = new ProducerRecord<String, String>(TOPIC_NAME, message);
                producer.send(record);
                System.out.println(message);
            }

            producer.flush();
            producer.close();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.将工程编译打包kafka-demo-1.0-SNAPSHOT.jar

mvn clean package

4.使用mvn命令将工程依赖库导出

mvn dependency:copy-dependencies -DoutputDirectory=/Users/fayson/Desktop/lib

将导出的jar包放在run-kafka/lib目录下。

5.编写run.sh脚本,运行测试jar包

#!/bin/bash

JAVA_HOME=/usr/java/jdk1.8.0_131-cloudera

for file in `ls lib/*jar`
do
    CLASSPATH=$CLASSPATH:$file
done

export CLASSPATH

${JAVA_HOME}/bin/java com.cloudera.ProducerTest

6.conf目录文件

fayson.keytab:fayson的keytab文件

jaas.conf:java访问Kerberos环境下的配置

krb5.conf:集群的krb5配置文件

6.Kafka->Flume->HDFS流程测试


1.将第5章开发好的示例放在集群的服务器上

2.执行run.sh

[ec2-user@ip-172-31-22-86 run-kafka]$ sh run.sh 

3.查看HDFS的/extwarehouse/student目录下数据

这里可以看到数据已写入HDFS指定的目录。

为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2017-12-15

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Hadoop实操

如何在Kerberos环境下的CDH集群部署Spark2.1的Thrift及spark-sql客户端

50740
来自专栏Hadoop实操

如何在Kerberos环境下的CDH集群部署Spark1.6 Thrift及spark-sql客户端

25430
来自专栏深度学习入门与实践

【原】Learning Spark (Python版) 学习笔记(三)----工作原理、调优与Spark SQL

  周末的任务是更新Learning Spark系列第三篇,以为自己写不完了,但为了改正拖延症,还是得完成给自己定的任务啊 = =。这三章主要讲Spark的运行...

318100
来自专栏Hadoop实操

如何使用Oozie API接口向Kerberos环境的CDH集群提交Shell作业

前面Fayson介绍了使用Oozie API向Kerberos和非Kerberos集群提交Spark和Java作业,本篇文章主要介绍如何使用Oozie Clie...

41160
来自专栏Hadoop实操

如何使用Oozie API接口向非Kerberos环境的CDH集群提交Shell工作流

前面Fayson介绍了《如何使用Oozie API接口向非Kerberos环境的CDH集群提交Spark作业》和《如何使用Oozie API接口向非Kerber...

35170
来自专栏Hadoop实操

如何使用Cloudera Manager在线为集群减容

在Hadoop集群资源紧张的情况下可以在线扩容来提升集群的计算能力,具体参考Fayson前面的文章《如何在非Kerberos环境下对CDH进行扩容》,那么在集群...

1.4K70
来自专栏Hadoop实操

如何通过Cloudera Manager配置Spark1和Spark2的运行环境

大部分用户在使用CDH集群做Spark开发的时候,由于开发环境的JDK版本比CDH集群默认使用的JDK1.7.0_67-cloudera版本新,可能会出现Spa...

87970
来自专栏Hadoop实操

如何使用Oozie API接口向Kerberos集群提交Java程序

在CDH集群外的节点向集群提交MapReduce作业的方式有多种,前面Fayson介绍了《如何跨平台在本地开发环境提交MapReduce作业到CDH集群》和《如...

1.3K70
来自专栏Hadoop实操

如何使用Oozie API接口向Kerberos环境的CDH集群提交Spark作业

在CDH集群外的节点向集群提交Spark作业的方式有多种,前面Fayson介绍了Livy相关的文章主要描述如何在集群外节点通过RESTful API接口向CDH...

48270
来自专栏Hadoop实操

Python3环境通过JDBC访问非Kerberos环境的Hive

在前面Fayson介绍了在Python2的环境下《如何使用Python Impyla客户端连接Hive和Impala》,本篇文章Fayson主要介绍在Pytho...

27110

扫码关注云+社区

领取腾讯云代金券