前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka-3python生产者和消费者

kafka-3python生产者和消费者

作者头像
py3study
发布2020-01-03 14:41:05
5350
发布2020-01-03 14:41:05
举报
文章被收录于专栏:python3

程序分为productor.py是发送消息端,consumer为消费消息端,

启动的时候先启动product再启动consumer,毕竟只有发了消息,消费端才有消息可以消费,

productor.py

代码语言:javascript
复制
#!/usr/bin/env python2.7
#_*_coding: utf-8 _*_
from kafka import KafkaProducer


kafka_host = '192.168.1.200'  # kafka服务器地址
kafka_port = 9092  # kafka服务器的端口


producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format(
    kafka_host = kafka_host,
    kafka_port = kafka_port
)])


#简单for循环10次,发送10条消息
for i in range(1,10):
    message_string = 'some message'.format(i)

    #调用send方法,发送名字为'topic1'的topicid ,发送的消息为message_string
    response = producer.send('topic1', message_string.encode('utf-8'))
    print response

consumer.py

代码语言:javascript
复制
#!/usr/bin/env python
#_*_coding: utf-8 _*_
import json
from kafka import *

kafka_host = '192.168.1.200'  # kafka服务器地址
kafka_port = 9092  # kafka服务器端口


#消费topic1的topic,并指定group_id(自定义),多个机器或进程想顺序消费,可以指定同一个group_id,
# 如果想一条消费多次消费,可以换一个group_id,会从头开始消费
consumer = KafkaConsumer(
    'topic1',
    group_id = 'my-group',
    bootstrap_servers = ['{kafka_host}:{kafka_port}'.format(kafka_host=kafka_host, kafka_port=kafka_port)]
)
for message in consumer:
    #json读取kafka的消息
    content = json.loads(message.value)
    print content
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/09/23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档