前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[727]python操作kafka

[727]python操作kafka

作者头像
周小董
发布2020-01-13 17:41:06
2.6K0
发布2020-01-13 17:41:06
举报
文章被收录于专栏:python前行者python前行者

kafka

pypi:https://pypi.org/project/kafka-python/ kafka-python:https://github.com/dpkp/kafka-python

代码语言:javascript
复制
pip install kafka
pip install kafka-python

如果想要完成负载均衡,就需要知道kafka的分区机制,同一个主题,可以为其分区,在生产者不指定分区的情况,kafka会将多个消息分发到不同的分区,消费者订阅时候如果不指定服务组,会收到所有分区的消息,如果指定了服务组,则同一服务组的消费者会消费不同的分区,如果2个分区两个消费者的消费者组消费,则,每个消费者消费一个分区,如果有三个消费者的服务组,则会出现一个消费者消费不到数据;如果想要消费同一分区,则需要用不同的服务组

kafka提供了偏移量的概念,允许消费者根据偏移量消费之前遗漏的内容,这基于kafka名义上的全量存储,可以保留大量的历史数据,历史保存时间是可配置的,一般是7天,如果偏移量定位到了已删除的位置那也会有问题,但是这种情况可能很小;每个保存的数据文件都是以偏移量命名的,当前要查的偏移量减去文件名就是数据在该文件的相对位置。要指定偏移量消费数据,需要指定该消费者要消费的分区,否则代码会找不到分区而无法消费

github:KafkaProducer
代码语言:javascript
复制
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
>>> for _ in range(100):
...     producer.send('foobar', b'some_message_bytes')

>>> # Block until a single message is sent (or timeout)
>>> future = producer.send('foobar', b'another_message')
>>> result = future.get(timeout=60)

>>> # Block until all pending messages are at least put on the network
>>> # NOTE: This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()

>>> # Use a key for hashed-partitioning
>>> producer.send('foobar', key=b'foo', value=b'bar')

>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', {'foo': 'bar'})

>>> # Serialize string keys
>>> producer = KafkaProducer(key_serializer=str.encode)
>>> producer.send('flipflap', key='ping', value=b'1234')

>>> # Compress messages
>>> producer = KafkaProducer(compression_type='gzip')
>>> for i in range(1000):
...     producer.send('foobar', b'msg %d' % i)

>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])

>>> # Get producer performance metrics
>>> metrics = producer.metrics()

补充

代码语言:javascript
复制
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['broker1:1234'])

def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)
    # handle exception

# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)

# configure multiple retries
producer = KafkaProducer(retries=5)
github:Consumer
代码语言:javascript
复制
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic')
>>> for msg in consumer:
...     print (msg)

>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
>>> for msg in consumer:
...     print (msg)

>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)

>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
...     assert isinstance(msg.value, dict)

>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
...     print (msg.headers)

>>> # Get consumer metrics
>>> metrics = consumer.metrics()

补充

代码语言:javascript
复制
from kafka import KafkaConsumer

consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])  #参数为接收主题和kafka服务器地址
# 这是一个永久堵塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都有偏移
for message in consumer:  # consumer是一个消息队列,当后台有消息时,这个消息队列就会自动增加.所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待消息带来
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

# 使用group,对于同一个group的成员只有一个消费者实例可以读取数据
consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

#消费者读取目前最早可读的消息
consumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)

# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))

# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)

# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)

# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')

auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest 源码定义:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}

  • 消费者(手动设置偏移量)
代码语言:javascript
复制
# ==========读取指定位置消息===============
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])

print(consumer.partitions_for_topic("test"))  #获取test主题的分区信息
print(consumer.topics())  #获取主题列表
print(consumer.subscription())  #获取当前消费者订阅的主题
print(consumer.assignment())  #获取当前消费者topic、分区信息
print(consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic='test', partition=0), 5)  #重置偏移量,从第5个偏移量消费
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
  • 消费者(订阅多个主题)
代码语言:javascript
复制
# =======订阅多个消费者==========
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0'))  #订阅要消费的主题
print(consumer.topics())
print(consumer.position(TopicPartition(topic='test', partition=0))) #获取当前主题的最新偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
  • 消费者(手动拉取消息)
代码语言:javascript
复制
from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
    msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
    print(msg)
    time.sleep(2)
  • 消费者(消息挂起与恢复)
代码语言:javascript
复制
# ==============消息恢复和挂起===========
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))  # pause执行后,consumer不能读取,直到调用resume后恢复。
num = 0
while True:
    print(num)
    print(consumer.paused())   #获取当前挂起的消费者
    msg = consumer.poll(timeout_ms=5)
    print(msg)
    time.sleep(2)
    num = num + 1
    if num == 10:
        print("resume...")
        consumer.resume(TopicPartition(topic='test', partition=0))
        print("resume......")

pause执行后,consumer不能读取,直到调用resume后恢复。

my code
代码语言:javascript
复制
# -*- coding:utf-8 -*-
import sys,time,json
from kafka import KafkaProducer,KafkaConsumer,TopicPartition, OffsetAndMetadata
from kafka.errors import KafkaError
# from kafka.structs import TopicPartition


'''
pip install kafka==1.3.5
pip install kafka-python==1.3.5
'''

kafka_host = "47.14.12.26"
kafka_port = 9092
kafka_topic = "test"


class Kafka():
    def __init__(self,key='key',group_id='group_id'):
        self.key = key
        bootstrap_servers = [
            '{kafka_host}:{kafka_port}'.format(kafka_host=kafka_host,kafka_port=kafka_port),
        ]
        self.producer = KafkaProducer(
            bootstrap_servers = bootstrap_servers,
        )
        
        '''
        fetch_min_bytes(int) - 服务器为获取请求而返回的最小数据量,否则请等待
        fetch_max_wait_ms(int) - 如果没有足够的数据立即满足fetch_min_bytes给出的要求,服务器在回应提取请求之前将阻塞的最大时间量(以毫秒为单位)
        fetch_max_bytes(int) - 服务器应为获取请求返回的最大数据量。这不是绝对最大值,如果获取的第一个非空分区中的第一条消息大于此值,
                                则仍将返回消息以确保消费者可以取得进展。注意:使用者并行执行对多个代理的提取,因此内存使用将取决于包含该主题分区的代理的数量。
                                支持的Kafka版本> = 0.10.1.0。默认值:52428800(50 MB)。
        enable_auto_commit(bool) - 如果为True,则消费者的偏移量将在后台定期提交。默认值:True。
        max_poll_records(int) - 单次调用中返回的最大记录数poll()。默认值:500
        max_poll_interval_ms(int) - poll()使用使用者组管理时的调用之间的最大延迟 。这为消费者在获取更多记录之前可以闲置的时间量设置了上限。
                                    如果 poll()在此超时到期之前未调用,则认为使用者失败,并且该组将重新平衡以便将分区重新分配给另一个成员。默认300000
        '''
        self.consumer = KafkaConsumer(
            # kafka_topic,
            bootstrap_servers=bootstrap_servers,
            group_id = group_id,
            # auto_offset_reset='earliest',
            enable_auto_commit=False
        )
        self.topic_partition=TopicPartition(topic=kafka_topic, partition=0)
        self.topic_partition2 =TopicPartition(topic=kafka_topic, partition=1)
        ##分配该消费者的TopicPartition,这里和KafkaConsumer()里不能同时配置kafka_topic
        self.consumer.assign([
            self.topic_partition,
            # self.topic_partition2
            ]
        )
        # 获取test主题的分区信息
        print(self.consumer.partitions_for_topic(kafka_topic))
        print(self.consumer.assignment())
        print(self.consumer.beginning_offsets(self.consumer.assignment()))
        committed_offset=self.consumer.committed(self.topic_partition)
        if committed_offset==None:
            ##重置此消费者消费的起始位
            self.consumer.seek(partition=self.topic_partition, offset=0)
        end_offset = self.consumer.end_offsets([self.topic_partition])[self.topic_partition]
        print('已保存的偏移量:',committed_offset,'最新偏移量:',end_offset)
        
    # 生产模块
    def producer_data(self,):
        try:
            for _id in range(600,610):
                params = {"msg" : str(_id)}
                parmas_message = json.dumps(params,ensure_ascii=False)
                v = parmas_message.encode('utf-8')
                k = self.key.encode('utf-8')
                print("send msg:(k,v)",k,v)
                self.producer.send(kafka_topic, key=k, value= v, partition=0)
                self.producer.flush()
                # time.sleep(0.5)
            self.producer.close()
        except KafkaError as e:
            print (e)

    # # 消费模块
    def consumer_data(self):
        try:
            print('consumer_data start')
            for msg in self.consumer:
                print(msg)
                print('msg----->k,v,offset:', msg.key, msg.value,msg.offset)
                # 手动提交偏移量 offsets格式:{TopicPartition:OffsetAndMetadata(offset_num,None)}
                self.consumer.commit(offsets={self.topic_partition:(OffsetAndMetadata(msg.offset+1,None))})
                committed_offset = self.consumer.committed(self.topic_partition)
                print('已保存的偏移量:', committed_offset)
                time.sleep(5)
        except KeyboardInterrupt as e:
            print(e)



if __name__ == '__main__':
    try:
        kafka=Kafka()
        kafka.producer_data()
        kafka.consumer_data()
    except Exception as e:
        import traceback
        ex_msg = '{exception}'.format(exception=traceback.format_exc())
        print(ex_msg)

参考: https://www.cnblogs.com/reblue520/p/8270412.html https://blog.csdn.net/luanpeng825485697/article/details/81036028 https://www.cnblogs.com/small-office/p/9399907.html https://blog.csdn.net/xiaofei2017/article/details/80924800


pykafka

pykafka:https://github.com/Parsely/pykafka

代码语言:javascript
复制
pip install pykafka

开始肯定去找python连接kafka的标准库,kafka-pythonpykafka 前者使用的人多是比较成熟的库,后者是Samsa的升级版本,在python连接并使用kafka 使用samsa连接zookeeper然后使用kafka Cluster很能满足我的需求,在pykafka的例子中也看到了zk的支持,而kafka-python并没有zk的支持,所以选择了pykafka做为连接库

  • 概念问题

kafaka和zookeeper的群集,使用samsa的时候生产者和消费者都连接了zookeeper,但是我跟人沟通,他们使用的时候是生产者直接连接kafaka服务器列表,消费者才用zookeeper。这也解决了我看pykafka文档,只有消费者才连接zookeeper的困惑,所以问题解决,直接按照文档搞起。

  • 生产者
代码语言:javascript
复制
>>> from pykafka import KafkaClient
>>> client = KafkaClient(hosts="192.168.1.1:9092, 192.168.1.2:9092") # 可接受多个Client这是重点
>>> client.topics  # 查看所有topic
>>> topic = client.topics['my.test'] # 选择一个topic
>>> producer = topic.get_producer()
>>> producer.produce(['test message ' + str(i ** 2) for i in range(4)]) # 加了个str官方的例子py2.7跑不过
  • 消费者
代码语言:javascript
复制
>>> balanced_consumer = topic.get_balanced_consumer(
    consumer_group='testgroup',
    auto_commit_enable=True,  # 设置为Flase的时候不需要添加 consumer_group
    zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot' # 这里就是连接多个zk
)

来源:http://opslinux.com/2015/07/14/python%E8%BF%9E%E6%8E%A5kafka/


本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-12-13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • kafka
    • github:KafkaProducer
      • github:Consumer
        • my code
        • pykafka
        相关产品与服务
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档