我正在尝试构建一个以Kafka为界面的Flask应用程序。我使用了Python连接器、卡夫卡-蟒蛇和Docker图像作为Kafka,spotify/kafkaproxy。
下面是停靠-撰写文件.
version: '3.3'
services:
  kafka:
    image: spotify/kafkaproxy
    container_name: kafka_dev
    ports:
      - '9092:9092'
      - '2181:2181'
    environment:
      - ADVERTISED_HOST=0.0.0.0
      - ADVERTISED_PORT=9092
      - CONSUMER_THREADS=1
      - TOPICS=PROFILE_CREATED,IMG_RATED
      - ZK_CONNECT=kafka7zookeeper:2181/root/path
  flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
      - '9000:5000'
    volumes:
      - ./flask-app:/app
    depends_on:
      - kafka下面是我用来连接到kafka的Python片段。在这里,我使用Kafka容器的别名kafka进行连接,因为Docker将负责将别名映射到它的IP地址。
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']
consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)我发现了NoBrokersAvailable错误。从这一点上,我可以理解的是,烧瓶应用程序找不到卡夫卡服务器。
Traceback (most recent call last):
  File "./app.py", line 11, in <module>
    consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable其他观察:
ping kafka,并从Kafka容器中获取数据包。BOOTSTRAP_SERVERS = ['localhost:9092']连接到Kafka容器时,它工作得很好。发布于 2018-11-27 08:52:36
更新
正如cricket_007所提到的,考虑到您使用的是下面提供的对接器-组合,您应该使用kafka:29092从另一个容器连接到Kafka。所以您的代码将如下所示:
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']
consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)端更新
我建议您使用来自汇流公司的卡夫卡图像,它们有各种各样的示例设置,使用的是坞-撰写,它们随时可以使用,并且总是在更新它们。
试试这个:
---
version: '2'
services:
zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000
kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
    - zookeeper
    ports:
    - 9092:9092
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
    - '9000:5000'
    volumes:
    - ./flask-app:/app我使用了这个docker-compose.yml并在上面添加了您的服务--请注意:
这里使用的配置公开了端口9092,用于与代理的外部连接,即来自坞网络之外的连接。这可能是从主机运行对接,或可能更远,如果你有一个更复杂的设置。如果后者为真,则需要将KAFKA_ADVERTISED_LISTENERS中的“localhost”值更改为可从这些远程客户端解析为停靠主机的值。
确保您签出了其他示例,尤其是在迁移到生产环境:https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples时,可能对您很有用。
同样值得检查的是:
您似乎需要指定api_version以避免此错误。有关更多细节,请查看这里。
这个库的1.3.5版本(是pypy的最新版本)只列出了某些API版本0.8.0至0.10.1。因此,除非您显式地将api_version指定为(0、10、1),否则客户端库试图发现版本将导致NoBrokersAvailable错误。
producer = KafkaProducer(
    bootstrap_servers=URL,
    client_id=CLIENT_ID,
    value_serializer=JsonSerializer.serialize,
    api_version=(0, 10, 1)
)这应该是可行的,有趣的是,设置api_version会意外地修复这个问题,如下所示:
当您设置api_version时,客户端将不会试图探查代理以获取版本信息。因此,正在失败的是探测操作。版本探测连接和一般连接之间的一个巨大区别是前者只尝试在每个连接上连接一个接口(每个代理),后者--一般操作--将持续遍历所有接口,直到连接成功。#1411通过切换版本探测逻辑来尝试在所有找到的接口上连接来解决这个问题。
实际问题被描述为这里
https://stackoverflow.com/questions/53494637
复制相似问题