python玩玩kafka

kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

kafka里面的一些概念:

producer:生产者。

consumer:消费者。

topic: 消息以topic为类别记录,Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic)。

broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

kafka有四个核心API:producer API,consumer API,streams API,connector API

kafka有什么用?

可它以有效的获取系统和应用程序之间的数据,对数据流进行转换或者反应。

关于kafka的下载安装就不过多介绍了,下面主要介绍的是使用python操作kafka。

首先安装kafka的模块:

pip install kafka

安装完我们就可以尝试着去跑个例子:

首先看看producer是怎么跑起来的:

fromkafkaimportKafkaProducer

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

foriinrange(3):

msg ="msg%d"% i

producer.send('test',msg)

producer.close()

调用KafkaProducer指定server地址即可

类似的来看看consumer例子:

fromkafkaimportKafkaConsumer

consumer = KafkaConsumer('test',

bootstrap_servers=['127.0.0.1:9092'])

formessageinconsumer:

print("%s:%d:%d: key=%s value=%s"% (message.topic,message.partition,

message.offset,message.key,

message.value))

对于consumer group(消费者群组),我们需要给一个群组id(用来区分单个消费者或是群组):

fromkafkaimportKafkaConsumer

consumer = KafkaConsumer('test',

group_id='my-group',

bootstrap_servers=['127.0.0.1:9092'])

formessageinconsumer:

print("%s:%d:%d: key=%s value=%s"% (message.topic,message.partition,

message.offset,message.key,

message.value))

使用consumer订阅多个主题,需要使用subscribe方法,传入需要订阅的标题:

fromkafkaimportKafkaConsumer

fromkafka.structsimportTopicPartition

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])

consumer.subscribe(topics=('topic1','topic2','top3'))#订阅要消费的主题

printconsumer.topics()

printconsumer.position(TopicPartition(topic=u'test',partition=))#获取当前主题的最新偏移量

formessageinconsumer:

print("%s:%d:%d: key=%s value=%s"% (message.topic,message.partition,

message.offset,message.key,

message.value))

如果需要手动拉取信息,那我们需要加一个循环,在这个循环里监听,一直获取服务器信息:

fromkafkaimportKafkaConsumer

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])

consumer.subscribe(topics=('topic1','topic2','top3'))

while True:

msg = consumer.poll(timeout_ms=5)#从kafka获取消息

printmsg

如果想挂起consumer可以调用pause()方法,恢复调用resume()方法:

fromkafkaimportKafkaConsumer

fromkafka.structsimportTopicPartition

importtime

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])

consumer.subscribe(topics=('topic1'))

consumer.topics()

consumer.pause(TopicPartition(topic=u'test',partition=))

num =

while True:

printnum

printconsumer.paused()#获取当前挂起的消费者

msg = consumer.poll(timeout_ms=5)

printmsg

time.sleep(2)

num = num +1

ifnum ==10:

consumer.resume(TopicPartition(topic=u'test',partition=))

print"resume......"

关于简单的操作就介绍到这里了,想了解更多:

https://pypi.org/project/kafka-python/

Pls follow It!!

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181106G0BI9N00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券