我无法将Streamz流转换为使用Kafka source.PFB代码生成的Dask流
from streamz import Stream
from streamz.dataframe import Random
from streamz.dataframe import DataFrame
import json
from dask.distributed import Client
client = Client()
source = Stream.from_kafka(['logs'],
{'bootstrap.servers': 'kafkaXX:9092',
'group.id': 'streamz'})
source.scatter().map(json.loads).buffer(8).gather().sink(print)
source.start()我收到此错误消息
ValueError: Two different event loops active发布于 2019-07-16 22:07:20
kafka源代码,如果没有其他说明,将在线程中启动自己的事件循环。调用Client()也可以做到这一点。要将循环从一个传递到另一个,您可以这样做
Stream.from_kafka(..., loop=client.loop)请注意,对.scatter()的调用还需要显式访问事件循环,但由于这是特定于dask的,因此它知道要使用活动的任何客户端的循环。
https://stackoverflow.com/questions/57037352
复制相似问题