前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python玩玩kafka

python玩玩kafka

作者头像
我被狗咬了
发布2019-09-23 10:41:02
8620
发布2019-09-23 10:41:02
举报
文章被收录于专栏:Python乱炖Python乱炖

kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

kafka里面的一些概念:

  • producer:生产者。
  • consumer:消费者。
  • topic: 消息以topic为类别记录,Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic)。
  • broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

kafka有四个核心API:producer API,consumer API,streams API,connector API

kafka有什么用?

可它以有效的获取系统和应用程序之间的数据,对数据流进行转换或者反应。

关于kafka的下载安装就不过多介绍了,下面主要介绍的是使用python操作kafka。

首先安装kafka的模块:

pip install kafka

安装完我们就可以尝试着去跑个例子:

首先看看producer是怎么跑起来的:

代码语言:javascript
复制
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

for i in range(3):
    msg = "msg%d" % i
    producer.send('test', msg)
producer.close()

调用KafkaProducer指定server地址即可

类似的来看看consumer例子:

代码语言:javascript
复制
from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
                         bootstrap_servers=['127.0.0.1:9092'])
                         
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

对于consumer group(消费者群组),我们需要给一个群组id(用来区分单个消费者或是群组):

代码语言:javascript
复制
from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
                         group_id='my-group',
                         bootstrap_servers=['127.0.0.1:9092'])
                         
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

使用consumer订阅多个主题,需要使用subscribe方法,传入需要订阅的标题:

代码语言:javascript
复制
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('topic1','topic2','top3'))  #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

如果需要手动拉取信息,那我们需要加一个循环,在这个循环里监听,一直获取服务器信息:

代码语言:javascript
复制
from kafka import KafkaConsumer

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('topic1','topic2','top3'))
while True:
    msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
    print msg

如果想挂起consumer可以调用pause()方法,恢复调用resume()方法:

代码语言:javascript
复制
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('topic1'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
    print num
    print consumer.paused()   #获取当前挂起的消费者
    msg = consumer.poll(timeout_ms=5)
    print msg
    time.sleep(2)
    num = num + 1
    if num == 10:
        consumer.resume(TopicPartition(topic=u'test', partition=0))
        print "resume......"

关于简单的操作就介绍到这里了,想了解更多:

https://pypi.org/project/kafka-python/

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-11-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python乱炖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档