专栏首页工作笔记精华python kafka kerberos 验证 消费 生产

python kafka kerberos 验证 消费 生产

安装

pykafka github

$ pip install pykafka

$ conda install -c conda-forge pykafka

注意kafka版本只支持 kafka 1.1, 1.0,0.11, 0.10, 0.9,0.8 (201902)

该作者在https://github.com/dpkp/kafka-python/pull/1152 这个推送增加了kerberos支持

验证kerberos

java或者文件中

对应python参数

描述

security.protocol

security_protocol

安全协议

kerberos.domain.name

sasl_kerberos_domain_name

域名

sasl.kerberos.service.name

sasl_kerberos_service_name

服务名

sasl.enabled.mechanisms&sasl.mechanism.inter.broker.protocol

sasl_mechanism

认证机制

principal

sasl_plain_username

用户租户名称

kerberos知识

配置一般在consumer.properties中
拆解一个Principal:
xianglei/dmp-master1.hadoop@HADOOP.COM
一个完整的Principal由3个部分构成。

用户名/FQDN(Full Quafilied Domain Name)的主机名@REALM(受保护的域,全大写)

当然这个用户名需要是Linux下存在的用户

FQDN全限定域名,就是一定要带上hostname.domain这种形式,当然,如果你的主机并没有给出domain,那么不写域名也可以。反正就是要全部的主机名加域名(如果存在域名的话)。但实际上,在Kerberos里面,这个并不称之为主机名,而是叫做Instance,实例名,他可以不是任何服务器的主机名称,但是便于理解和认识,我们还是先把他当初主机名来看待吧。

REALM,受到Kerberos保护的域名称,就是一类或一组受到Kerberos保护服务的服务器集合,你可以想象成Windows里面的域。由于一个KDC可以同时保护多个域,比如你可以在一个KDC上既保护HADOOP服务器组,也保护MYSQL服务器组,所以我们通常会使用域名来进行区别。

如果你的hostname里面使用了domain name,那么你必须在Principal的第二部分写完整,否则KDC将无法验证主机的合法性,加密的tgt是要带着主机名信息的。

还有,特别需要注意的是,这里面第二部分的domain(域名),第三部分的realm(域),在中文里的字是一样,但是英文单词完全不同,他们所表达的含义也完全不同。由于通常Kerberos的Realm部分也会写成域名的形式,所以就会让人迷惑,而实际上,你把realm部分理解成windows里面的workgroup或者home这种域也是可以的。名称可以随便起,不一定用你的真实域名。只是个区分不同服务集合的代号。

使用

资料

我是用来连接华为kafka的,测试可以通过kerberos验证。具体代码就不贴了,引用一下其他作者的,感谢

#coding=utf8
 
from kafka import KafkaProducer
 
producer = KafkaProducer(bootstrap_servers=["xxxx:9200"],
                         security_protocol="SASL_PLAINTEXT",
                         sasl_mechanism="GSSAPI",
                         sasl_kerberos_service_name="kafka")
print "connect success."
future = producer.send("xxxx", "test")
result = future.get(timeout=60)
print "send success."

其他示例代码

原贴

kafka简介(摘自百度百科)
 
一、简介:
详见:https://blog.csdn.net/Beyond_F4/article/details/80310507
 
二、安装
详见博客:https://blog.csdn.net/beyond_f4/article/details/80095689              
 
三、按照官网的样例,先跑一个应用
1、生产者:
from kafka import KafkaProducer
 
producer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092'])  #此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
 
for i in range(3):
    msg = "msg%d" % i
    producer.send('test', msg)
producer.close()
 
2、消费者(简单demo):
from kafka import KafkaConsumer
 
consumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092'])
                         
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
 
 
启动后生产者、消费者可以正常消费。
 
3、消费者(消费群组)
from kafka import KafkaConsumer
 
consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['172.21.10.136:9092'])
                         
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
                                          
启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力
 
4、消费者(读取目前最早可读的消息)
from kafka import KafkaConsumer
 
consumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['172.21.10.136:9092'])
                         
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
                                         
auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
源码定义:{'smallest': 'earliest', 'largest': 'latest'}
 
5、消费者(手动设置偏移量)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
 
consumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092'])
 
print consumer.partitions_for_topic("test")  #获取test主题的分区信息
print consumer.topics()  #获取主题列表
print consumer.subscription()  #获取当前消费者订阅的主题
print consumer.assignment()  #获取当前消费者topic、分区信息
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic=u'test', partition=0), 5)  #重置偏移量,从第5个偏移量消费
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
                                          
6、消费者(订阅多个主题)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
 
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))  #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
                                          
7、消费者(手动拉取消息)
from kafka import KafkaConsumer
import time
 
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
    msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
    print msg
    time.sleep(1)
    
8、消费者(消息挂起与恢复)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
 
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
    print num
    print consumer.paused()   #获取当前挂起的消费者
    msg = consumer.poll(timeout_ms=5)
    print msg
    time.sleep(2)
    num = num + 1
    if num == 10:
        print "resume..."
        consumer.resume(TopicPartition(topic=u'test', partition=0))
        print "resume......"
        
pause执行后,consumer不能读取,直到调用resume后恢复。

示例代码2

原贴

"""生产者"""
def produce(self):
    producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
    for i in range(4):
        msg = "msg%d" %i
        producer.send(self.topic,key=str(i),value=msg)
    producer.close()

"""一个消费者消费一个topic"""
def consume(self):
    #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers)
    consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers)
    print consumer.partitions_for_topic(self.topic)  #获取test主题的分区信息
print consumer.topics()  #获取主题列表
print consumer.subscription()  #获取当前消费者订阅的主题
print consumer.assignment()  #获取当前消费者topic、分区信息
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic=self.topic, partition=0), 1)  #重置偏移量,从第1个偏移量消费
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" 
        % (message.topic,message.partition,message.offset, message.key,message.value))

"""一个消费者订阅多个topic """
def consume2(self):
    consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))  #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic='TEST', partition=0)) #获取当前主题的最新偏移量
for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
        
"""消费者(手动拉取消息)"""
def consume3(self):
    consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))
while True:
        message = consumer.poll(timeout_ms=5)   #从kafka获取消息
        if message:
        print message
        time.sleep(1)

示例代码3

原贴

#coding=utf-8
from pykafka import KafkaClient
import codecs
import logging
logging.basicConfig(level = logging.INFO)

client = KafkaClient(hosts = "172.16.82.163:9091")

#生产kafka数据,通过字符串形式
def produce_kafka_data(kafka_topic):
  with kafka_topic.get_sync_producer() as producer:
    for i in range(4):
      producer.produce('test message' + str(i ** 2))

#消费kafka数据
def consume_simple_kafka(kafka_topic, timeout):
  consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms = timeout)
  for message in consumer:
    if message is not None:
      print message.offset, message.value

#消费同一份kafka topic时,建议使用 get_balanced_consumer(),暂时不能使用
#问题:kazoo.handlers.threading.KazooTimeoutError: Connection time-out
def consume_kafka(kafka_topic, zkhost):
  balanced_consumer = kafka_topic.get_balanced_consumer(
  consumer_group = "testgroup",
  auto_commit_enable = False,
  zookeeper_connect = zkhost,
  #zookeeper = zkhost,
  zookeeper_connection_timeout_ms = 6000,
  consumer_timeout_ms = 10000,
  )
  for message in balanced_consumer:
    if message is not None:
      print message.offset, message.value

#通过文件,往kafka刷数据
def produce_kafka_file(filename, kafka_topic):
  with kafka_topic.get_sync_producer() as producer:
    with codecs.open(filename, "r", "utf8") as rf:
      for line in rf:
        line = line.strip()
        if not line:
          continue
      producer.produce(line)

#===========================================================

topic = client.topics["mytest"]

#在consumer_timeout_ms内没有任何信息返回,则中断接受消息
cosumer = topic.get_simple_consumer(consumer_timeout_ms = 10000)
cnt = 0
for message in cosumer:
  if message is not None:
    print message.offset, message.value
  cnt += 1
print cnt

(adsbygoogle = window.adsbygoogle || []).push({});

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 在hbase 激活kerberos 下opentsdb的使用

    最近公司大数据集群统一升级了 kerberos,那原先 的opentsdb就不能使用了,需要使用keytab方式登陆验证。

    stys35
  • 0579-5.15.1-Java 应用程序中修改Kerberos ticket_lifetime参数无效异常分析

    在Kerberos环境中,我们的应用程序通过Java代码来提交任务需要先进行Kerberos凭证的初始化然后进行应用程序的提交,本文档主要讲述Java应用程序中...

    Fayson
  • 0578-5.15.1-Kerberos环境下Java应用程序认证超时异常分析

    在Kerberos环境中,我们的应用程序通过Java代码来提交任务需要先进行Kerberos凭证的初始化然后进行应用程序的提交,本文档主要讲述Java应用程序长...

    Fayson
  • 微软超融合私有云测试10-SCVMM2016部署之创建运行方式账户与添加委派

    VMM安装成功后,首先创建执行任务的运行方式账户用以执行SCVMM的操作,SCVMM执行操作时会自动调用运行方式账户的凭据去执行,例如部署SCVMM代理,查询H...

    SuperDream
  • 大数据集群安全组件解析(含代码)

    感谢阅读「美图数据技术团队」的第 5 篇文章,关注我们持续获取美图最新数据技术动态。

    美图数据技术团队
  • 0583-5.16.1-1.4.2-后台脚本无感知为CDSW用户绑定Kerberos账号(密码认证)

    业务用户在安全环境下使用CDSW服务,为了防止用户的Kerberos的账号和密码泄露问题,需要管理系统统一的为业务用户分发Kerberos账号。本篇文章Fays...

    Fayson
  • 0582-5.16.1-1.4.2-后台脚本无感知为CDSW用户绑定Kerberos账号(keytab认证)

    在前面的文章Fayson介绍了在CDSW用户无感知的情况下通过API接口以密码的方式为不同的业务用户绑定Kerberos账号,CDSW的Hadoop Authe...

    Fayson
  • 0543-5.15.0-Kerberos环境下Kafka管理工具Kafka Eagle安装使用

    Fayson在前面的文章介绍了《如何在CDH集群安装Kafka Manager》和《0542-6.1.0-非安全环境下Kafka管理工具Kafka Eagle安...

    Fayson
  • HBase授权 转

    HBase在不开启授权的情况下,任何账号对HBase集群可以进行任何操作,比如disable table/drop table/major compact等等。

    双面人

扫码关注云+社区

领取腾讯云代金券