我正在尝试构建一个以Kafka为界面的Flask应用程序。我使用了Python连接器、卡夫卡-蟒蛇和Docker图像作为Kafka,spotify/kafkaproxy。
下面是停靠-撰写文件.
version: '3.3'
services:
kafka:
image: spotify/kafkaproxy
container_name: kafka_dev
ports:
- '9092:9092'
- '2181:2181'
environment:
- ADVERTISED_HOST=0.0.0.0
- ADVERTISED_PORT=9092
- CONSUMER_THREADS=1
- TOPICS=PROFILE_CREATED,IMG_RATED
- ZK_CONNECT=kafka7zookeeper:2181/root/path
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- '9000:5000'
volumes:
- ./flask-app:/app
depends_on:
- kafka下面是我用来连接到kafka的Python片段。在这里,我使用Kafka容器的别名kafka进行连接,因为Docker将负责将别名映射到它的IP地址。
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']
consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)我发现了NoBrokersAvailable错误。从这一点上,我可以理解的是,烧瓶应用程序找不到卡夫卡服务器。
Traceback (most recent call last):
File "./app.py", line 11, in <module>
consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable其他观察:
ping kafka,并从Kafka容器中获取数据包。BOOTSTRAP_SERVERS = ['localhost:9092']连接到Kafka容器时,它工作得很好。发布于 2018-11-27 08:52:36
更新
正如cricket_007所提到的,考虑到您使用的是下面提供的对接器-组合,您应该使用kafka:29092从另一个容器连接到Kafka。所以您的代码将如下所示:
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']
consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)端更新
我建议您使用来自汇流公司的卡夫卡图像,它们有各种各样的示例设置,使用的是坞-撰写,它们随时可以使用,并且总是在更新它们。
试试这个:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- '9000:5000'
volumes:
- ./flask-app:/app我使用了这个docker-compose.yml并在上面添加了您的服务--请注意:
这里使用的配置公开了端口9092,用于与代理的外部连接,即来自坞网络之外的连接。这可能是从主机运行对接,或可能更远,如果你有一个更复杂的设置。如果后者为真,则需要将KAFKA_ADVERTISED_LISTENERS中的“localhost”值更改为可从这些远程客户端解析为停靠主机的值。
确保您签出了其他示例,尤其是在迁移到生产环境:https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples时,可能对您很有用。
同样值得检查的是:
您似乎需要指定api_version以避免此错误。有关更多细节,请查看这里。
这个库的1.3.5版本(是pypy的最新版本)只列出了某些API版本0.8.0至0.10.1。因此,除非您显式地将api_version指定为(0、10、1),否则客户端库试图发现版本将导致NoBrokersAvailable错误。
producer = KafkaProducer(
bootstrap_servers=URL,
client_id=CLIENT_ID,
value_serializer=JsonSerializer.serialize,
api_version=(0, 10, 1)
)这应该是可行的,有趣的是,设置api_version会意外地修复这个问题,如下所示:
当您设置api_version时,客户端将不会试图探查代理以获取版本信息。因此,正在失败的是探测操作。版本探测连接和一般连接之间的一个巨大区别是前者只尝试在每个连接上连接一个接口(每个代理),后者--一般操作--将持续遍历所有接口,直到连接成功。#1411通过切换版本探测逻辑来尝试在所有找到的接口上连接来解决这个问题。
实际问题被描述为这里
发布于 2020-02-28 08:46:38
我在所有服务之间使用一个名为网络的stream_net成功地启动并运行了这个程序。
# for local development
version: "3.7"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- stream_net
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- stream_net
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- "9000:5000"
volumes:
- ./flask-app:/app
networks:
- stream_net
depends_on:
- kafka
networks:
stream_net:localhost:9092上的容器外部连接kafka:29092网络中的连接当然,将已经在网络中运行的所有容器放置在网络中是很奇怪的。但通过这种方式,容器可以按其实际名称命名。也许有人能确切地解释这是如何工作的,或者它可以帮助别人理解问题的核心,并正确地解决它。
https://stackoverflow.com/questions/53494637
复制相似问题