python多进程卡死排查记录
问题背景:python程序一开始为单进程结构,属于CPU密集型计算任务,为提高性能调整为多进程并行计算。调整后运行时程序启动一会就全部进程卡死,不再往下运行。问题排查:程序中没有使用到进程锁或者分布式锁,所以先是排除了死锁的原因,可能为逻辑BUG或者代码异常。多进程下只好通过打日志的方式定位下程序运行到什么位置了,最后定位出不再往下运行的代码行为:msgs: Dict = kwargs['msgs
问题背景:
python程序一开始为单进程结构,属于CPU密集型计算任务,为提高性能调整为多进程并行计算。
调整后运行时程序启动一会就全部进程卡死,不再往下运行。
问题排查:
程序中没有使用到进程锁或者分布式锁,所以先是排除了死锁的原因,可能为逻辑BUG或者代码异常。
多进程下只好通过打日志的方式定位下程序运行到什么位置了,最后定位出不再往下运行的代码行为:
msgs: Dict = kwargs['msgs']
这部分的完整代码块为:
class ConclusionWriteHandler(SinkWriterInterface):
"""数据分析结论入MQ handler"""
def write(self, **kwargs):
msgs: Dict = kwargs['msgs']
serializer = kwargs['serializer']
kafka_client = Kafka()
for conclusion_category, conclusion_list in msgs.items():
if not conclusion_list:
continue
sub_msgs = [serializer(x) for x in conclusion_list]
# 分析结论的topic名称和结论类型是保持一致的
kafka_client.produce(topic=conclusion_category, msgs=sub_msgs)
log.info(f'load {len(msgs)} conclusions to topic {conclusion_category}')
一开始看到定位出的位置是一脸懵逼的,一个普通的字典取值操作怎么就卡住了,以为是数据格式存在问题,添加 try catch 尝试捕获异常,但并没有触发。
当前是在pycharm内使用debug模式运行的程序,确认这块代码没有问题后,尝试在命令行运行,看下定位出的位置是不是一致的,果然,命令行下运行这行代码(msgs: Dict = kwargs['msgs'])运行通过,于是继续添加日志定位程序卡住的位置,随后定位到了kafka数据发送模块:
def produce(self, topic: str, msgs: List) -> NoReturn:
"""
生产者
Args:
topic: topic
msgs: msg列表
"""
for msg in msgs:
self.producer.poll(0)
self.producer.produce(topic=topic, value=msg.encode('utf-8'), callback=self.delivery_report)
self.producer.flush()
发现数据都有发送,但是在运行到 self.producer.flush() 时程序不再运行了,推测为 confluent-kafka 这个位置的使用出现了问题,其中kafka客户端代码为:
class KafkaCli(metaclass=SingletonClass):
def __init__(self):
self.__producer = None
self.__consumer = None
def client_init(self, bootstrap_servers: str,
group_id: str,
session_timeout_ms: int = 6000,
enable_auto_commit: bool = True,
auto_offset_reset: str = 'earliest') -> NoReturn:
"""producer && consumer init"""
producer_config = {
'bootstrap.servers': bootstrap_servers
}
consumer_config = dict(**producer_config, **{
'group.id': group_id,
'session.timeout.ms': session_timeout_ms,
'enable.auto.commit': enable_auto_commit,
# 'on_commit': on_commit,
'auto.offset.reset': auto_offset_reset
})
self.__consumer = kafka.Consumer(consumer_config)
self.__producer = kafka.Producer(producer_config)
这里kafka client为单例模式,并且的程序初始化的时间初始化 producer 和 consumer,其中单例元类为:
class SingletonClass(type):
"""单例实现-元类"""
_instance_lock = threading.Lock()
def __call__(cls, *args, **kwargs):
if not hasattr(cls, '_instance'):
with cls._instance_lock:
if not hasattr(cls, '__instance'):
cls._instance = super().__call__(*args, **kwargs)
return cls._instance
理论上,创建多进程的时间,主进程的所有资源会拷贝一份到子进程中,虽然这里在初始化时为单例对象,但创建的各个子进程仍然有自己的一个kafka client对象(这个也通过打印获取的kafka client地址确定了)
所以回过头来看下程序卡住的位置:
producer.flush() 的作用是将生成者队列中的所有消息发送出去,这里卡住可能就是发送机制上出现问题了。
面向搜索引擎解决问题:
链接地址: https://github.com/confluentinc/confluent-kafka-python/issues/351
问题原因也看到了: kafka客户端使用了内部线程,创建子进程时并不会把主进程中的子线程也拷贝的子进程空间内。
这其实和在POSIX(以前称为 Unix 的标准)上创建子进程的方式有关:
1. 使用fork()系统调用创建进程的副本。
2. 子进程使用execve()系统调用(或其变体之一,例如execl())将自身替换为不同的程序。
我们都知道创建子进程会拷贝主进程中的所有内存内容,但实际上fork()并没有拷贝所有内容,其中就包括父进程中的线程,即:
任何在父进程中运行的线程都不存在于子进程中
所以程序卡住的原因也就找到了:
kafka client是在程序启动的时间做的初始化,底层使用的是线程,所以接下来创建出的子进程中不含有kafka client底层需要的线程,最后producer在发送数据是没有底层线程进行调度,直接阻塞了。
问题修复:
将kafka client初始化的工作在各个子进程中完成。
class KafkaCli(metaclass=KafkaSingletonClass):
def __init__(self):
self.__producer = None
self.__consumer = None
self.client_init(
bootstrap_servers=CONFIG.KAFKA.bootstrap_servers,
group_id=CONFIG.KAFKA.group_id,
session_timeout_ms=CONFIG.KAFKA.session_timeout_ms,
enable_auto_commit=CONFIG.KAFKA.enable_auto_commit,
auto_offset_reset=CONFIG.KAFKA.auto_offset_reset
)
def client_init(self, bootstrap_servers: str,
group_id: str,
session_timeout_ms: int = 6000,
enable_auto_commit: bool = True,
auto_offset_reset: str = 'earliest') -> NoReturn:
"""producer && consumer init"""
producer_config = {
'bootstrap.servers': bootstrap_servers
}
consumer_config = dict(**producer_config, **{
'group.id': group_id,
'session.timeout.ms': session_timeout_ms,
'enable.auto.commit': enable_auto_commit,
# 'on_commit': on_commit,
'auto.offset.reset': auto_offset_reset
})
self.__consumer = kafka.Consumer(consumer_config)
self.__producer = kafka.Producer(producer_config)
同时修改使用的单例元类,实现每个进程内的单例:
class KafkaSingletonClass(type):
"""kafka 专用的单例元类"""
_instance_lock = threading.Lock()
_instance = {}
def __call__(cls, *args, **kwargs):
pid = mp.current_process().pid
if pid not in cls._instance:
with cls._instance_lock:
if pid not in cls._instance:
cls._instance[pid] = super().__call__(*args, **kwargs)
return cls._instance[pid]
问题修复只改动了kafka client初始化部分的代码,不需要对其它位置进行调整。
子进程创建中的线程拷贝问题相关链接:
https://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them
更多推荐
所有评论(0)