首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

python多进程-kafka异步消息处理

背景:

现有资源及框架是基于python语言编写;部分松耦合资源的串行导致效率较低;

思路:

核心思路:采用数据分片,将存在冲突的数据分到不同数据块上,通过数据块之间的并行及数据块内的串行,尽可能使中间过程并行;提高整体速度 ,就kafka而言,可以帮助解耦部分中间过程处理,下面给出python下生产者与消费者使用的demo,注意:消费者是采用的multiprocessing.Process非线程

import threading, logging, timeimport multiprocessingfrom kafka import KafkaConsumer, KafkaProducer# 生产者 基于线程class Producer(threading.Thread):def __init__(self): threading.Thread.__init__(self) self.stop_event = threading.Event() def stop(self): self.stop_event.set() def run(self): producer = KafkaProducer(bootstrap_servers='localhost:9092') while not self.stop_event.is_set(): producer.send('my-topic', b"test") producer.send('my-topic', b"Hola, mundo!") time.sleep(1) producer.close()

# 消费者 基于多进程class Consumer(multiprocessing.Process):def __init__(self): multiprocessing.Process.__init__(self) self.stop_event = multiprocessing.Event() def stop(self): self.stop_event.set() def run(self): consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', consumer_timeout_ms=1000) consumer.subscribe(['my-topic']) while not self.stop_event.is_set(): for message in consumer: print(message) if self.stop_event.is_set(): break consumer.close()

#main方法中具体进行具体消息的生产及消费时,实例化生产/消费者后,依次调用.start() .stop() .join()方法

具体参见git:https://github.com/dpkp/kafka-python下的example.py文件

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190320A08N7V00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券