我正在编写一个脚本,在AWS管理的Kafka集群上刷新我的主题。每当我运行脚本时,我需要删除现有的数据,并再次删除和创建相同的主题。我期望脚本在我反复运行时打印出成功的删除和成功的创建。但是删除/创建失败了,每一次运行都会让我感到困惑。
以下是我的脚本:
# manage_topics.py
import sys
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import KafkaError, KafkaException
if __name__ == '__main__':
kafka_cfg = '.....' # omitted
admin_client = AdminClient(kafka_cfg)
deletion_ret = admin_client.delete_topics(['my-test-topic1'])
for topic, delete_fut in deletion_ret.items():
try:
status = delete_fut.result()
print(f'{topic} deletion is successful. status={status}')
except KafkaException as e:
print(f'could not delete topic: {topic}, error: {str(e)}')
if e.args[0].code() != KafkaError.UNKNOWN_TOPIC_OR_PART:
print('exiting...')
sys.exit(1)
else:
print('ignoring UNKNOWN_TOPIC_OR_PART error')
# I have two brokers for the Kafka instance I was given
creation_ret = admin_client.create_topics([NewTopic('my-test-topic1', 5, 2)])
for topic, create_fut in creation_ret.items():
try:
status = create_fut.result()
print(f'{topic} creation is successful. status={status}')
except KafkaException as e:
print(f'could not create topic: {topic}, error: {str(e)}')
这是它生成的日志。不管我每次跑多久都没有关系。在我看来,当一次成功的删除之后是一次创建,那么删除就需要一段时间,这样接下来的创建就会失败。当我再次运行它时,前面的删除将完成,然后当前的删除将失败,创建将成功。
如果有人能帮我理解和改进这个脚本,我会非常感激的。
$ python manage_topics.py
could not delete topic: my-test-topic1, error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ignoring UNKNOWN_TOPIC_OR_PART error
my-test-topic1 creation is successful. status=None
$
$
$ python manage_topics.py
my-test-topic1 deletion is successful. status=None
could not create topic: my-test-topic1, error: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'my-test-topic1' already exists."}
$
$
$ python manage_topics.py
could not delete topic: my-test-topic1, error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ignoring UNKNOWN_TOPIC_OR_PART error
my-test-topic1 creation is successful. status=None
$
$
$ python manage_topics.py
my-test-topic1 deletion is successful. status=None
could not create topic: my-test-topic1, error: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'my-test-topic1' already exists."}
$
$
发布于 2022-07-26 01:44:12
主题删除是服务器端的异步操作。您的未来结果只捕获请求的响应(主题被标记为删除),而不是实际的集群删除所有副本。
https://stackoverflow.com/questions/73115898
复制相似问题