首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何删除并在卡夫卡的脚本中正确地创建主题?

如何删除并在卡夫卡的脚本中正确地创建主题?
EN

Stack Overflow用户
提问于 2022-07-25 22:18:15
回答 2查看 298关注 0票数 0

我正在编写一个脚本,在AWS管理的Kafka集群上刷新我的主题。每当我运行脚本时,我需要删除现有的数据,并再次删除和创建相同的主题。我期望脚本在我反复运行时打印出成功的删除和成功的创建。但是删除/创建失败了,每一次运行都会让我感到困惑。

以下是我的脚本:

代码语言:javascript
运行
复制
# manage_topics.py
import sys
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import KafkaError, KafkaException
if __name__ == '__main__':
    kafka_cfg = '.....' # omitted  
    admin_client = AdminClient(kafka_cfg)

    deletion_ret = admin_client.delete_topics(['my-test-topic1'])
    for topic, delete_fut in deletion_ret.items():
        try:
            status = delete_fut.result()
            print(f'{topic} deletion is successful. status={status}')
        except KafkaException as e:
            print(f'could not delete topic: {topic}, error: {str(e)}')
            if e.args[0].code() != KafkaError.UNKNOWN_TOPIC_OR_PART:
                print('exiting...')
                sys.exit(1)
            else:
                print('ignoring UNKNOWN_TOPIC_OR_PART error')

    # I have two brokers for the Kafka instance I was given
    creation_ret = admin_client.create_topics([NewTopic('my-test-topic1', 5, 2)])
    for topic, create_fut in creation_ret.items():
        try:
            status = create_fut.result()
            print(f'{topic} creation is successful. status={status}')
        except KafkaException as e:
            print(f'could not create topic: {topic}, error: {str(e)}')

这是它生成的日志。不管我每次跑多久都没有关系。在我看来,当一次成功的删除之后是一次创建,那么删除就需要一段时间,这样接下来的创建就会失败。当我再次运行它时,前面的删除将完成,然后当前的删除将失败,创建将成功。

如果有人能帮我理解和改进这个脚本,我会非常感激的。

代码语言:javascript
运行
复制
$ python manage_topics.py
could not delete topic: my-test-topic1, error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ignoring UNKNOWN_TOPIC_OR_PART error
my-test-topic1 creation is successful. status=None
$
$
$ python manage_topics.py
my-test-topic1 deletion is successful. status=None
could not create topic: my-test-topic1, error: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'my-test-topic1' already exists."}
$
$
$ python manage_topics.py
could not delete topic: my-test-topic1, error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ignoring UNKNOWN_TOPIC_OR_PART error
my-test-topic1 creation is successful. status=None
$
$
$ python manage_topics.py
my-test-topic1 deletion is successful. status=None
could not create topic: my-test-topic1, error: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'my-test-topic1' already exists."}
$
$
EN

Stack Overflow用户

回答已采纳

发布于 2022-07-26 01:44:12

主题删除是服务器端的异步操作。您的未来结果只捕获请求的响应(主题被标记为删除),而不是实际的集群删除所有副本。

相关Kafka - delete a topic completely

票数 1
EN
查看全部 2 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73115898

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档