专栏首页python3Kafka系列3-python版本pro

Kafka系列3-python版本pro

直接上代码了:

# -*- coding: utf-8 -*-

'''
    使用kafka-Python 1.3.3模块
'''

import sys
import time
import json

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError


KAFAKA_HOST = "127.0.0.1"
KAFAKA_PORT = 9092
KAFAKA_TOPIC = "foobar"


class Kafka_producer():
    '''
    生产模块:根据不同的key,区分消息
    '''

    def __init__(self, kafkahost,kafkaport, kafkatopic, key):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.key = key
        self.producer = KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                                        kafka_host=self.kafkaHost,
                                        kafka_port=self.kafkaPort)
                        )

    def sendjsondata(self, params):
        try:
            parmas_message = json.dumps(params)
            producer = self.producer
            producer.send(self.kafkatopic, key=self.key, value=parmas_message.encode('utf-8'))
            producer.flush()
        except KafkaError as e:
            print e



class Kafka_consumer():
    '''
    消费模块: 通过不同groupid消费topic里面的消息
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.key = key
        self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
                                    bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                                        kafka_host=self.kafkaHost,
                                        kafka_port=self.kafkaPort )
                        )

    def consume_data(self):
        try:
            for message in self.consumer:
                yield message
        except KeyboardInterrupt, e:
            print e


def main(xtype, group, key):
    '''
    测试consumer和producer
    '''
    if xtype == "p":
        # 生产模块
        producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)
        print "===========> producer:", producer
        for _id in range(100):
           params = '{"msg" : "%s"}' % str(_id)
           producer.sendjsondata(params)
           time.sleep(1)

    if xtype == 'c':
        # 消费模块
        consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group)
        print "===========> consumer:", consumer
        message = consumer.consume_data()
        for msg in message:
            print 'msg---------------->', msg
            print 'key---------------->', msg.key
            print 'offset---------------->', msg.offset



if __name__ == '__main__':
    xtype = sys.argv[1]
    group = sys.argv[2]
    key = sys.argv[3]
    main(xtype, group, key)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • python网页截图

    py3study
  • 使用Python写Windows Ser

    如果你想用Python开发Windows程序,并让其开机启动等,就必须写成windows的服务程序Windows Service,用Python来做这个事情必...

    py3study
  • 用python实现漂亮的烟花demo

    py3study
  • Airflow自定义插件, 使用datax抽数

    Airflow之所以受欢迎的一个重要因素就是它的插件机制。Python成熟类库可以很方便的引入各种插件。在我们实际工作中,必然会遇到官方的一些插件不足够满足需求...

    Ryan-Miao
  • Python Web Flask源码解读(二)——路由原理

    在 Flask中是使用 @app.route这个装饰器来实现 url和方法之间的映射的。

    阳仔
  • TensorFlow强化学习入门(4)——深度Q网络(DQN)及其扩展

    本文中我们将一起创建一个深度Q网络(DQN)。它基于我们系列文章中(0)的单层Q网络,如果你是强化学习的初学者,我推荐你到文末跳转到(0)开始阅读。尽管简单的Q...

    ArrayZoneYour
  • iOS开发之UITableView联动实现城市选择器

    在 iOS开发之城市选择器一文中用两列的UIPickerView实现了城市选择器,今天用两个UITableView来实现一下,首先这种联动在很多地方用得上,而且...

    YungFan
  • python 写window服务

    import win32serviceutil import win32service import win32event import os impo...

    用户5760343
  • 一文了解 Python 的 “Magic” 方法

    原标题 :Python magic methods or special methods

    崔庆才
  • Python制作小软件——3. 利用Py

    本篇博客衔接前面两篇博客: Python制作小软件——1. 安装并使用PyQt5进行界面设计、Python制作小软件——2. 实现界面中的退出功能。

    py3study

扫码关注云+社区

领取腾讯云代金券