如何使用Java连接Kerberos的Kafka

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

1.文档编写目的


Kafka从0.8版本以后出了新的API接口,用于异步方式发送消息,性能优于旧的API,本篇文章主要使用新的API接口进行测试。继上一篇文章如何通过Cloudera Manager为Kafka启用Kerberos及使用,本篇文章主要讲述如何使用Java连接Kerberos的Kafka集群生产和消费消息。

  • 内容概述

1.环境准备

2.创建Java工程

3.编写生产消息代码

4.编写消费消息代码

5.测试

  • 测试环境

1.RedHat7.2

2.CM和CDH版本为5.11.2

3.Kafka2.2.0-0.10.2

  • 前置条件

1.Intellij已安装且正常运行

2.Maven环境正常

2.环境准备


1.创建topic,test3有3个replication,3个partition

ec2-user@ip-172-31-22-86~$ kafka-topics --create --zookeeper ip-172-31-22-86.ap-southeast-1.compute.internal:2181 --replication-factor 3 --partitions 3 --topic test3

2.krb5.conf配置(直接使用CDH集群的Kerberos配置)

Configuration snippets may beplaced in this directory as well

includedir /etc/krb5.conf.d/

logging

default = FILE:/var/log/krb5libs.log

kdc =FILE:/var/log/krb5kdc.log

admin_server = FILE:/var/log/kadmind.log

libdefaults

dns_lookup_realm = false

ticket_lifetime = 24h

renew_lifetime = 7d

forwardable = true

rdns = false

default_realm = CLOUDERA.COM

#default_ccache_name = KEYRING:persistent:%{uid}

realms

CLOUDERA.COM = {

kdc =ip-172-31-22-86.ap-southeast-1.compute.internal

admin_server = ip-172-31-22-86.ap-southeast-1.compute.internal

}

domain_realm

.ip-172-31-22-86.ap-southeast-1.compute.internal= CLOUDERA.COM

ip-172-31-22-86.ap-southeast-1.compute.internal= CLOUDERA.COM

3.Kerberos的keytab文件

使用kadmin为Kerberos账号生成keytab,fayson.keytab文件生成在当前目录下。

ec2-user@ip-172-31-22-86~$ sudo kadmin.local

Authenticating as principal hdfs/admin@CLOUDERA.COM with password.

kadmin.local: xst -norandkey -k fayson.keytab fayson@CLOUDERA.COM

...

kadmin.local: exit

ec2-user@ip-172-31-22-86~$

4.jaas-cache.conf配置文件

KafkaClient{

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

keyTab="/Volumes/Transcend/keytab/fayson.keytab"

principal="fayson@CLOUDERA.COM";

};

5.在当前开发环境下配置集群的主机信息到hosts文件

在/etc/hosts文件中添加

提示:Fayson使用的AWS环境,所以使用公网IP和hostname对应。如果你的开发环境可以直连Hadoop集群,可以直接配置Hadoop内网IP和hostname对应即可。

3.创建Java工程


1.使用Intellij创建Java Maven工程

2.在pom.xml配置文件中增加Kafka API的Maven依赖

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.2.0</version>
</dependency>

4.编写生产消息代码


package com.cloudera;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**

* Created by fayson on 2017/10/24.

*/

public class MyProducer {

public static String TOPIC_NAME = "test3";

public static void main(String[] args){

System.setProperty("java.security.krb5.conf", "/Volumes/Transcend/keytab/krb5.conf");

System.setProperty("java.security.auth.login.config", "/Volumes/Transcend/keytab/jaas-cache.conf");

System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");

// System.setProperty("sun.security.krb5.debug","true");

Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");

props.put(ProducerConfig.ACKS_CONFIG, "all");

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

props.put("security.protocol", "SASL_PLAINTEXT");

props.put("sasl.kerberos.service.name", "kafka");

Producer<String,String> producer = new KafkaProducer<String,String>(props);

for (int i = 0; i < 10; i++) {

String key = "key-"+ i;

String message = "Message-"+ i;

ProducerRecord record= new ProducerRecord<String, String>(TOPIC_NAME, key, message);

producer.send(record);

System.out.println(key + "----"+ message);

}

producer.close();

}

}

5.编写消费消息代码


package com.cloudera;

import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;

import java.util.Properties;

/**

* Created by fayson on 2017/10/24.

*/

public class MyConsumer {

private static String TOPIC_NAME = "test3";

public static void main(String[] args){

System.setProperty("java.security.krb5.conf", "/Volumes/Transcend/keytab/krb5.conf");

System.setProperty("java.security.auth.login.config", "/Volumes/Transcend/keytab/jaas-cache.conf");

System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

props.put("security.protocol", "SASL_PLAINTEXT");

props.put("sasl.kerberos.service.name", "kafka");

KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);

TopicPartition partition0= new TopicPartition(TOPIC_NAME, 0);

TopicPartition partition1= new TopicPartition(TOPIC_NAME, 1);

TopicPartition partition2= new TopicPartition(TOPIC_NAME, 2);

consumer.assign(Arrays.asList(partition0,partition1, partition2));

ConsumerRecords<String,String> records = null;

while (true){

try {

Thread.sleep(10000l);

System.out.println();

records = consumer.poll(Long.MAX_VALUE);

for (ConsumerRecord<String, String> record : records) {

System.out.println("Receivedmessage: (" + record.key() + "," + record.value() + ") at offset " + record.offset());

}

        } **catch** (**InterruptedException** e){
            e.printStackTrace();

}

    }
}

}

6.代码测试


1.执行消费程序,消费topic为test3的所有partition消息

启动成功,等待消费test3的消息

2.执行生产消息程序,向test3的topic生产消息

向test3的topic发送的消息

3.查看消费程序读取到的消息

7.总结


在开发环境下通过Java代码直接连接到已启用Kerberos的Kafka集群时,则需要将krb5.conf和jaas.conf配置加载到程序运行环境中。至于使用Kerberos密码的方式Fayson也不会。

测试使用的topic有3个partiton,如果没有将所有的broker列表配置到bootstrap.servers中,会导致部分消息丢失。

参考文档:

http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

http://kafka.apache.org/documentation/#api

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。


推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2017-10-29

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏依乐祝

.NET Core实战项目之CMS 第十一章 开发篇-数据库生成及实体代码生成器开发

上篇给大家从零开始搭建了一个我们的ASP.NET Core CMS系统的开发框架,具体为什么那样设计我也已经在第十篇文章中进行了说明。不过文章发布后很多人都说了...

22340
来自专栏Java帮帮-微信公众号-技术文章全总结

solr使用教程【面试+工作】

solr使用教程一【面试+工作】 Solr调研总结 开发类型全文检索相关开发 Solr版本4.2文件内容本文介绍solr的功能使用及相关注意事项;主要包括以下内...

5.7K60
来自专栏数据和云

【Oracle字符集】识别及转换导出文件的字符集

编辑手记:很多人在进行数据库导入导出操作的时候会遇到字符集的问题,今日拣选了 《循序渐进Oracle》一书中的相关章节,希望对初学Oracle的朋友有所帮助。 ...

49440
来自专栏加米谷大数据

MapReduce API 基本概念

在正式分析新旧 API 之前,先要介绍几个基本概念。这些概念贯穿于所有 API 之中,因此,有必要单独讲解。 1、 序列化 序列化是指将结构化对象转为字节流以便...

29770
来自专栏我的博客

TP入门第三天

1、系统常量 TP2.1版本:(蓝色是3.0中去掉) __ROOT__  : 网站根目录地址  __APP__  : 当前项目(入口文件)地址  __GROUP...

28450
来自专栏Hadoop实操

如何使用Scala代码访问Kerberos环境的HDFS

前面Fayson介绍了《如何使用Java API访问HDFS为目录设置配额》,随着开发语言的多样性,也有基于Scala语言进行开发,本篇文章主要介绍如何使用Sc...

399100
来自专栏FreeBuf

Fuzz自动化Bypass软WAF姿势

0×00 前言 在我刚接触安全这块时候遇到注入有WAF的网站时候无从下手,寻找各种有关绕过waf的文章,在网页浏览器上使用SQL语句为了绕过WAF变了个法加了些...

1.1K100
来自专栏乐沙弥的世界

使用Uniread实现SQLplus翻页功能

    对于经常使用SQLplus的网友来说,SQLplus没有提供类似DOS工具的翻页功能,故不能实现SQL语句的来回翻动。现在我们有了Uniread, 减...

6210
来自专栏大数据和云计算技术

sqoop数据导入总结

这是黄文辉同学处女作,大家支持! 其他相关文章:元数据概念 Sqoop主要用来在Hadoop(HDFS)和关系数据库中传递数据,使用Sqoop,我们可以方便地...

48480
来自专栏Java学习网

10 个影响程序性能的Hibernate 错误,学会让你少走弯路

我在很多应用程序中修复过性能问题,其中大部分都是由同样的错误引起的。修复之后,性能变得更溜,而且其中的大部分问题都很简单。所以,如果你想改进应用程序,那么可能也...

33550

扫码关注云+社区

领取腾讯云代金券