前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka-python 执行两次初始化导致进程卡主

kafka-python 执行两次初始化导致进程卡主

原创
作者头像
victorfengming
修改2023-12-02 14:04:55
1670
修改2023-12-02 14:04:55
举报
文章被收录于专栏:Victor的专栏Victor的专栏

Python logging库重复初始化导致进程卡住

### 前置知识

1. python的logging库

Python 的 logging 库是一个灵活且强大的日志记录工具,用于在应用程序中捕获、记录和处理日志信息。它提供了一种配置日志记录的方式,可以满足不同需求的应用程序。

以下是 logging 库的一些关键概念和组件:

Logger(记录器): 记录器是日志记录的入口点,负责发出各种日志消息。

Handler(处理器): 处理器将日志消息发送到目标,如控制台、文件或网络。

Formatter(格式化器): 格式化器定义日志输出的格式,用于美化和定制日志消息。

Level(级别): 级别用于控制日志消息的重要性,包括 DEBUG、INFO、WARNING、ERROR 和 CRITICAL。

Filter(过滤器): 过滤器允许更精细地控制哪些日志消息被记录。

配置文件: 日志配置文件提供一种灵活的配置方式,允许通过文件而非代码进行日志配置。

2. python的celery框架

Celery 是一个开源的分布式任务队列系统,用于处理大量的异步任务。它允许你将任务从应用程序中分离出来,异步地执行它们,提高应用程序的性能和可伸缩性。Celery主要用于处理耗时的任务,如发送电子邮件、生成报告、处理图像等。

以下是 Celery 的一些主要特性和概念:

分布式任务队列: Celery 是一个分布式系统,用于处理异步任务,将任务分发到多个工作节点。

异步任务: 允许将任务提交到队列,实现异步执行,提高应用性能和响应速度。

任务调度: 支持定时任务调度,类似于 cron,可以在未来的特定时间执行任务。

消息代理: 与多种消息代理(如 RabbitMQ、Redis、Amazon SQS)集成,用于在应用程序和工作节点之间传递任务消息。

结果存储: 可将任务执行的结果保存在不同的后端存储中,例如数据库、缓存等。

任务重试: 具备自动重试机制,可配置任务在失败时进行重试。

监控和管理: 提供工具和界面用于监控和管理任务队列,包括 Web 界面和命令行工具。

多语言支持: 主要用于 Python,但提供了多语言客户端库,支持其他编程语言的集成。

3. python连接kafka的库python-kakfa

`

kafka-python ` 是一个用于在 Python 中与 Apache Kafka 集成的客户端库。它提供了 `KafkaProducer` 类用于将消息发送到 Kafka 主题,以及 `KafkaConsumer` 类用于从 Kafka 主题中消费消息。通过这个库,你可以方便地在 Python 中与 Kafka 集群进行通信,实现消息的发布和订阅功能。`kafka-python` 还支持各种配置选项,允许你调整客户端的行为,以满足特定需求。

### 现象描述

python的celery启动后, celery worker 进程卡住, 无法处理任务

并且没有任何日志输出

### 原因概述

我们有一个代码仓库,

既有定时任务的代码, 又有Api应用的代码, 有同事加了一个定时任务, 不小心引入的Api的一个util代码, 导致执行了两次init_logger()初始化日志器方法,(一次定时任务本身的, 一次依赖链中的Api的)

init_logger()里面其中包含了一个初始化日志处理器(发送邮件)名为EmailHandler

EmailHandler里面初始化了kafkaProducer

但是执行第二次init_logger()的时候, 有这么一个逻辑, 执行了这个函数_clearExistingHandlers()

_clearExistingHandlers()这个函数的作用是清空已有的日志处理器列表

这个时间kafkaProducer刚刚在前一次初始化好相关资源, 还有相关的锁没有被释放

这个时候去清EmailHandler,就会导致那个锁没有释放, 无法创建第二个实例, 导致进程卡主没有日志

### 源码分析

/venv/lib/python3.7/site-packages/kafka/producer/kafka.py <br>

line 445

```python

def close(self, timeout=None):

"""Close the producer.

This method sends a ProducerClose request to the Kafka broker and

waits for acknowledgement. It is a good practice to call this method

before the Python process exits to ensure any outstanding messages

are delivered.

Args:

timeout (float): Maximum time to wait for the producer to complete

the close operation, in seconds. If `None`, the method will

block until the close is complete.

Raises:

KafkaException: If the producer close failed.

"""

# ...

# Acquire the lock to ensure thread safety during close

with self._lock:

# Check if the producer is already closed

if self._closed:

return

# Set _closed flag to True to mark the producer as closed

self._closed = True

# ...

# Wait for the background thread to finish

if timeout is not None:

self._sender_thread.join(timeout)

# Release the lock

with self._lock:

# Check if the producer is already closed

if self._closed:

return

# ...

# ...

```

概括

```python

with self._lock::通过 with 语句,获取 _lock 锁,确保在多线程环境下的线程安全性。

if self._closed::检查生产者是否已经关闭,如果已经关闭,直接返回,避免重复关闭。

self._closed = True:将 _closed 标志设置为 True,表示生产者已关闭。

self._sender_thread.join(timeout):等待后台线程完成。_sender_thread 是一个在生产者初始化时启动的后台线程,负责异步发送消息到 Kafka broker。

with self._lock::再次获取锁,确保在关闭期间不会有其他线程对生产者进行操作。

if self._closed::再次检查生产者是否已经关闭,避免重复关闭。

```

此部分代码主要是为了确保在多线程环境下,对生产者的关闭操作是线程安全的,并等待后台线程完成。这有助于确保在关闭过程中不会出现竞态条件,从而确保生产者的关闭操作是可靠的。

### 排查步骤

由于我们的应用部署在华为云中, 所以日志使用的是华为云LTS, 而LTS没有采集到任何日志, 所以

手动进入k8s的pod中, 执行`kubectl logs -f` 查看日志, 还是什么日志也没有

然后执行了`kubectl exec -it podname -n -- bash`进入pod, <br>

手动启动celery任务`celery -A tasks.app worker -l`

启动后打印了几行初始化日志后, 进程卡主, CTRL+C中断程序后, 打印出了错误的堆栈信息

### 重现步骤

```python

from kafka import producer

from config.config import ConfigInfo as Config

import time

bootstrap_servers = [f'{Config.KAFKA_QUOTE_MAIL_HOST1}:{Config.KAFKA_QUOTE_MAIL_PORT}',

f'{Config.KAFKA_QUOTE_MAIL_HOST2}:{Config.KAFKA_QUOTE_MAIL_PORT}',

f'{Config.KAFKA_QUOTE_MAIL_HOST3}:{Config.KAFKA_QUOTE_MAIL_PORT}']

kp = producer.KafkaProducer(bootstrap_servers=bootstrap_servers)

kp.close()

print('ok!')

```

这样就会报错, 如果close前面等待一段时间, 就不会报错

```python

from kafka import producer

from config.config import ConfigInfo as Config

import time

bootstrap_servers = [f'{Config.KAFKA_QUOTE_MAIL_HOST1}:{Config.KAFKA_QUOTE_MAIL_PORT}',

f'{Config.KAFKA_QUOTE_MAIL_HOST2}:{Config.KAFKA_QUOTE_MAIL_PORT}',

f'{Config.KAFKA_QUOTE_MAIL_HOST3}:{Config.KAFKA_QUOTE_MAIL_PORT}']

kp = producer.KafkaProducer(bootstrap_servers=bootstrap_servers)

time.sleep(1)

kp.close()

print('ok!')

```

### 解决方案

避免重复执行kafkaPruducer的销毁和初始化 <br>

应用发版后, 不仅需要检查应用运行状态, 还要检查是否有日志输出

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档