首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >无法将Streamz Kafka流转换为Dask流

无法将Streamz Kafka流转换为Dask流
EN

Stack Overflow用户
提问于 2019-07-15 17:57:40
回答 1查看 291关注 0票数 2

我无法将Streamz流转换为使用Kafka source.PFB代码生成的Dask流

代码语言:javascript
运行
复制
from streamz import Stream
from streamz.dataframe import Random
from streamz.dataframe import DataFrame
import json
from dask.distributed import Client
client = Client()
source = Stream.from_kafka(['logs'],
       {'bootstrap.servers': 'kafkaXX:9092',
        'group.id': 'streamz'}) 
source.scatter().map(json.loads).buffer(8).gather().sink(print)
source.start()

我收到此错误消息

代码语言:javascript
运行
复制
ValueError: Two different event loops active
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-07-16 22:07:20

kafka源代码,如果没有其他说明,将在线程中启动自己的事件循环。调用Client()也可以做到这一点。要将循环从一个传递到另一个,您可以这样做

代码语言:javascript
运行
复制
Stream.from_kafka(..., loop=client.loop)

请注意,对.scatter()的调用还需要显式访问事件循环,但由于这是特定于dask的,因此它知道要使用活动的任何客户端的循环。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57037352

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档