首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >无法连接到卡夫卡在一个对接的环境中从水瓶

无法连接到卡夫卡在一个对接的环境中从水瓶
EN

Stack Overflow用户
提问于 2018-11-27 07:24:57
回答 2查看 1.9K关注 0票数 5

我正在尝试构建一个以Kafka为界面的Flask应用程序。我使用了Python连接器、卡夫卡-蟒蛇和Docker图像作为Kafka,spotify/kafkaproxy

下面是停靠-撰写文件.

代码语言:javascript
运行
复制
version: '3.3'
services:
  kafka:
    image: spotify/kafkaproxy
    container_name: kafka_dev
    ports:
      - '9092:9092'
      - '2181:2181'
    environment:
      - ADVERTISED_HOST=0.0.0.0
      - ADVERTISED_PORT=9092
      - CONSUMER_THREADS=1
      - TOPICS=PROFILE_CREATED,IMG_RATED
      - ZK_CONNECT=kafka7zookeeper:2181/root/path
  flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
      - '9000:5000'
    volumes:
      - ./flask-app:/app
    depends_on:
      - kafka

下面是我用来连接到kafka的Python片段。在这里,我使用Kafka容器的别名kafka进行连接,因为Docker将负责将别名映射到它的IP地址。

代码语言:javascript
运行
复制
from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)

我发现了NoBrokersAvailable错误。从这一点上,我可以理解的是,烧瓶应用程序找不到卡夫卡服务器。

代码语言:javascript
运行
复制
Traceback (most recent call last):
  File "./app.py", line 11, in <module>
    consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

其他观察:

  1. 我能够从容器中运行ping kafka,并从Kafka容器中获取数据包。
  2. 当我在本地运行Flask应用程序,试图通过设置BOOTSTRAP_SERVERS = ['localhost:9092']连接到Kafka容器时,它工作得很好。
EN

Stack Overflow用户

发布于 2018-11-27 08:52:36

更新

正如cricket_007所提到的,考虑到您使用的是下面提供的对接器-组合,您应该使用kafka:29092从另一个容器连接到Kafka。所以您的代码将如下所示:

代码语言:javascript
运行
复制
from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)

端更新

我建议您使用来自汇流公司的卡夫卡图像,它们有各种各样的示例设置,使用的是坞-撰写,它们随时可以使用,并且总是在更新它们。

试试这个:

代码语言:javascript
运行
复制
---
version: '2'
services:
zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
    - zookeeper
    ports:
    - 9092:9092
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
    - '9000:5000'
    volumes:
    - ./flask-app:/app

我使用了这个docker-compose.yml并在上面添加了您的服务--请注意:

这里使用的配置公开了端口9092,用于与代理的外部连接,即来自坞网络之外的连接。这可能是从主机运行对接,或可能更远,如果你有一个更复杂的设置。如果后者为真,则需要将KAFKA_ADVERTISED_LISTENERS中的“localhost”值更改为可从这些远程客户端解析为停靠主机的值。

确保您签出了其他示例,尤其是在迁移到生产环境:https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples时,可能对您很有用。

同样值得检查的是:

您似乎需要指定api_version以避免此错误。有关更多细节,请查看这里

这个库的1.3.5版本(是pypy的最新版本)只列出了某些API版本0.8.0至0.10.1。因此,除非您显式地将api_version指定为(0、10、1),否则客户端库试图发现版本将导致NoBrokersAvailable错误。

代码语言:javascript
运行
复制
producer = KafkaProducer(
    bootstrap_servers=URL,
    client_id=CLIENT_ID,
    value_serializer=JsonSerializer.serialize,
    api_version=(0, 10, 1)
)

这应该是可行的,有趣的是,设置api_version会意外地修复这个问题,如下所示:

当您设置api_version时,客户端将不会试图探查代理以获取版本信息。因此,正在失败的是探测操作。版本探测连接和一般连接之间的一个巨大区别是前者只尝试在每个连接上连接一个接口(每个代理),后者--一般操作--将持续遍历所有接口,直到连接成功。#1411通过切换版本探测逻辑来尝试在所有找到的接口上连接来解决这个问题。

实际问题被描述为这里

票数 1
EN
查看全部 2 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53494637

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档