专栏首页stream processkafka 配置kerberos校验以及开启acl实践

kafka 配置kerberos校验以及开启acl实践

转载请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7131626.html

kafka从0.9版本以后引入了集群安全机制,由于最近需要新搭建一套kafka集群,决定采用SASL/GSSAPI (Kerberos) 作为新集群的权限系统基础,本次新集群版本为0.10.2.0。

团队内部已有同学搭建了专门的kerberos服务器,所以省掉了自建kerberos的步骤。

(1)首先是为broker每台服务器在kerber服务器生成相应的principal和keytab,将下列命令里生成的kafka.keytab文件分发到对应broker机器的统一位置,比如/etc/kafka.keytab

addprinc -randkey kafka/kafkahost1@EXAMPLE.COM
addprinc -randkey kafka/kafkahost2@EXAMPLE.COM
addprinc -randkey kafka/kafkahost3@EXAMPLE.COM
.........


xst -norandkey -k /opt/kafkahost1/kafka.keytab kafka/kafkahost1@EXAMPLE.COM
xst -norandkey -k /opt/kafkahost2/kafka.keytab kafka/kafkahost2@EXAMPLE.COM
xst -norandkey -k /opt/kafkahost3/kafka.keytab kafka/kafkahost3@EXAMPLE.COM
.....

(2)配置kafka server文件

listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka //这里的service.name要跟上面建立的principal相对应,kafka/kafkahost3@EXAMPLE.COM的principal服务名就是kafka
super.users=User:kafka  //acl相关,broker服务本身是采用kafka这个服务身份进行交互,只有配置成superuser才能获取集群内的metadata信息
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer //acl相关,配置后才能启用acl

(3)建立kafka_server_jaas.conf文件,由于集群使用的zookeeper并没有启用kerberos,所以没有client模块,KafkaClient模块是为了bin目录下kafka-console-consumer.sh之类的的脚本使用的

KafkaServer {
            com.sun.security.auth.module.Krb5LoginModule required
            useKeyTab=true
            storeKey=true
            keyTab="/etc/kafka.keytab"
            principal="kafka/kafkahost1@EXAMPLE.COM";
        };

KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/kafka.keytab"
        principal="kafka/kafkahost1@EXAMPLE.COM"
        useTicketCache=true;
};

(4)修改bin目录下kafka-run-class.sh,在 exec $JAVA 后面增加kerberos启动参数,然后就可以用正常的脚本启动服务了:

-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf  

(5)客户端脚本使用

启用kerberos后,部分kafka管理脚本需要增加额外的参数才能使用

首先建立配置文件client.properties

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI

所以新命令的使用方式为

bin/kafka-consumer-groups.sh --bootstrap-server kafkahost1:9092 --list --command-config client.properties

bin/kafka-console-producer.sh --broker-list kafkahost1:9092 --topic dxTT --producer.config client.properties

bin/kafka-console-consumer.sh --bootstrap-server kafkahost1:9092 --topic dxTT --consumer.config client.properties

问题记录:

kafka服务端配置好kerberos后,controller持续报无法连接到broker的错误(包括连接自身实例),大概错误如下

[2018-01-25 17:48:41,864] WARN [Controller-60-to-broker-60-send-thread], Controller 60's connection to broker kafka60:9092 (id: 60 rack: null) was unsuccessful (kafka.controller.RequestSendThread) java.io.IOException: Connection to kafka60:9092 (id: 60 rack: null) failed at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94) at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2018-01-25 17:48:41,970] WARN [Controller-60-to-broker-60-send-thread], Controller 60's connection to broker kafka60:9092 (id: 60 rack: null) was unsuccessful (kafka.controller.RequestSendThread)

原因: 打开kafka-authorizer.log的DEBUG日志会看到具体错误,这个是由于线上jre的环境缺少kerberos认证的算法库导致的,更新jre相关类库即可

[2018-01-25 17:55:31,155] DEBUG Connection with /host disconnected (org.apache.kafka.common.network.Selector) java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Failure unspecified at GSS-API level (Mechanism level: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled)] at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:250) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:71) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:350) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at kafka.network.Processor.poll(SocketServer.scala:494) at kafka.network.Processor.run(SocketServer.scala:432) at java.lang.Thread.run(Thread.java:748) Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Failure unspecified at GSS-API level (Mechanism level: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled)] at com.sun.security.sasl.gsskerb.GssKrb5Server.evaluateResponse(GssKrb5Server.java:199) at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:235) ... 6 more Caused by: GSSException: Failure unspecified at GSS-API level (Mechanism level: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled) at sun.security.jgss.krb5.Krb5Context.acceptSecContext(Krb5Context.java:856) at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:342) at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:285) at com.sun.security.sasl.gsskerb.GssKrb5Server.evaluateResponse(GssKrb5Server.java:167) ... 7 more Caused by: KrbException: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled at sun.security.krb5.EncryptionKey.findKey(EncryptionKey.java:522) at sun.security.krb5.KrbApReq.authenticate(KrbApReq.java:273) at sun.security.krb5.KrbApReq.<init>(KrbApReq.java:149) at sun.security.jgss.krb5.InitSecContextToken.<init>(InitSecContextToken.java:108) at sun.security.jgss.krb5.Krb5Context.acceptSecContext(Krb5Context.java:829) ... 10 more

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • kafka 部分问题处理记录

    转载请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7600561.html

    sanmutongzi
  • kafka default partitioner java版本和scala版本的不同

    sanmutongzi
  • guava之Joiner 和 Splitter

    最近在给客户准备一个Guava的分享,所以会陆续的更新关于Guava更多的细节分享。本文将记录Guava中得字符串处理Joiner(连接)和Splitter(分...

    sanmutongzi
  • kafka权限认证

    背景: 最近公司因为用的云服务器,需要保证kafka的安全性。可喜的是kafka0.9开始,已经支持权限控制了。网上中文资料又少,特此基于kafka0.9,...

    shengjk1
  • kafka管理神器-kafkamanager

    https://github.com/yahoo/kafka-manager/releases

    Spark学习技巧
  • Kafka 压测:3 台廉价服务器竟支撑 200 万 TPS

    这篇文章是关于LinkedIn如何用kafka作为一个中央发布-订阅日志,在应用程序,流处理,hadoop数据提取之间集成数据。无论如何,kafka日志一个好处...

    芋道源码
  • Salesforce发布了CMS产品,逻辑是什么?

    最近Salesforce宣布发布了自己的CMS产品,这似乎又超出了我们的想象范围,CMS传统上来说用于网站建设的,为什么一个CRM厂商需要CMS产品,这里面的逻...

    臭豆腐
  • Librdkafka的各种task处理

    扫帚的影子
  • 日志打入kafka改造历程-我们到底能走多远系列

    日志收集的方案有很多,包括各种日志过滤清洗,分析,统计,而且看起来都很高大上。本文只描述一个打入kafka的功能。

    java架构师
  • 学习kafka教程(二)

    Kafka Streams编写关键任务实时应用程序和微服务的最简单方法,是一个用于构建应用程序和微服务的客户端库,其中输入和输出数据存储在Kafka集群中。它结...

    用户3467126

扫码关注云+社区

领取腾讯云代金券