问题背景:
        python程序一开始为单进程结构,属于CPU密集型计算任务,为提高性能调整为多进程并行计算。
        调整后运行时程序启动一会就全部进程卡死,不再往下运行。

问题排查:
        程序中没有使用到进程锁或者分布式锁,所以先是排除了死锁的原因,可能为逻辑BUG或者代码异常。
        多进程下只好通过打日志的方式定位下程序运行到什么位置了,最后定位出不再往下运行的代码行为:

msgs: Dict = kwargs['msgs']

        这部分的完整代码块为:

class ConclusionWriteHandler(SinkWriterInterface):
    """数据分析结论入MQ handler"""

    def write(self, **kwargs):
        msgs: Dict = kwargs['msgs']
        serializer = kwargs['serializer']
        kafka_client = Kafka()
        for conclusion_category, conclusion_list in msgs.items():
            if not conclusion_list:
                continue
            sub_msgs = [serializer(x) for x in conclusion_list]
            # 分析结论的topic名称和结论类型是保持一致的
            kafka_client.produce(topic=conclusion_category, msgs=sub_msgs)
            log.info(f'load {len(msgs)} conclusions to topic {conclusion_category}')

        一开始看到定位出的位置是一脸懵逼的,一个普通的字典取值操作怎么就卡住了,以为是数据格式存在问题,添加 try catch 尝试捕获异常,但并没有触发。
        当前是在pycharm内使用debug模式运行的程序,确认这块代码没有问题后,尝试在命令行运行,看下定位出的位置是不是一致的,果然,命令行下运行这行代码(msgs: Dict = kwargs['msgs'])运行通过,于是继续添加日志定位程序卡住的位置,随后定位到了kafka数据发送模块:

def produce(self, topic: str, msgs: List) -> NoReturn:
    """
    生产者

    Args:
        topic: topic
        msgs: msg列表

    """
    for msg in msgs:
        self.producer.poll(0)
        self.producer.produce(topic=topic, value=msg.encode('utf-8'), callback=self.delivery_report)
    self.producer.flush()

        发现数据都有发送,但是在运行到 self.producer.flush() 时程序不再运行了,推测为 confluent-kafka 这个位置的使用出现了问题,其中kafka客户端代码为:

class KafkaCli(metaclass=SingletonClass):
    def __init__(self):
        self.__producer = None
        self.__consumer = None

    def client_init(self, bootstrap_servers: str,
                    group_id: str,
                    session_timeout_ms: int = 6000,
                    enable_auto_commit: bool = True,
                    auto_offset_reset: str = 'earliest') -> NoReturn:
        """producer && consumer init"""
        producer_config = {
            'bootstrap.servers': bootstrap_servers
        }
        consumer_config = dict(**producer_config,  **{
            'group.id': group_id,
            'session.timeout.ms': session_timeout_ms,
            'enable.auto.commit': enable_auto_commit,
            # 'on_commit': on_commit,
            'auto.offset.reset': auto_offset_reset
        })
        self.__consumer = kafka.Consumer(consumer_config)
        self.__producer = kafka.Producer(producer_config)

        这里kafka client为单例模式,并且的程序初始化的时间初始化 producer 和 consumer,其中单例元类为:

class SingletonClass(type):
    """单例实现-元类"""
    _instance_lock = threading.Lock()

    def __call__(cls, *args, **kwargs):
        if not hasattr(cls, '_instance'):
            with cls._instance_lock:
                if not hasattr(cls, '__instance'):
                    cls._instance = super().__call__(*args, **kwargs)
        return cls._instance

        理论上,创建多进程的时间,主进程的所有资源会拷贝一份到子进程中,虽然这里在初始化时为单例对象,但创建的各个子进程仍然有自己的一个kafka client对象(这个也通过打印获取的kafka client地址确定了)        

        所以回过头来看下程序卡住的位置:       

         producer.flush() 的作用是将生成者队列中的所有消息发送出去,这里卡住可能就是发送机制上出现问题了。

        面向搜索引擎解决问题:       

         链接地址: https://github.com/confluentinc/confluent-kafka-python/issues/351

        问题原因也看到了: kafka客户端使用了内部线程,创建子进程时并不会把主进程中的子线程也拷贝的子进程空间内。

 

这其实和在POSIX(以前称为 Unix 的标准)上创建子进程的方式有关:

        1. 使用fork()系统调用创建进程的副本。

        2. 子进程使用execve()系统调用(或其变体之一,例如execl())将自身替换为不同的程序。

我们都知道创建子进程会拷贝主进程中的所有内存内容,但实际上fork()并没有拷贝所有内容,其中就包括父进程中的线程,即:

        任何在父进程中运行的线程都不存在于子进程中

所以程序卡住的原因也就找到了:

        kafka client是在程序启动的时间做的初始化,底层使用的是线程,所以接下来创建出的子进程中不含有kafka client底层需要的线程,最后producer在发送数据是没有底层线程进行调度,直接阻塞了。

 

问题修复:

        将kafka client初始化的工作在各个子进程中完成。

class KafkaCli(metaclass=KafkaSingletonClass):
    def __init__(self):
        self.__producer = None
        self.__consumer = None
        self.client_init(
            bootstrap_servers=CONFIG.KAFKA.bootstrap_servers,
            group_id=CONFIG.KAFKA.group_id,
            session_timeout_ms=CONFIG.KAFKA.session_timeout_ms,
            enable_auto_commit=CONFIG.KAFKA.enable_auto_commit,
            auto_offset_reset=CONFIG.KAFKA.auto_offset_reset
        )

    def client_init(self, bootstrap_servers: str,
                    group_id: str,
                    session_timeout_ms: int = 6000,
                    enable_auto_commit: bool = True,
                    auto_offset_reset: str = 'earliest') -> NoReturn:
        """producer && consumer init"""
        producer_config = {
            'bootstrap.servers': bootstrap_servers
        }
        consumer_config = dict(**producer_config,  **{
            'group.id': group_id,
            'session.timeout.ms': session_timeout_ms,
            'enable.auto.commit': enable_auto_commit,
            # 'on_commit': on_commit,
            'auto.offset.reset': auto_offset_reset
        })
        self.__consumer = kafka.Consumer(consumer_config)
        self.__producer = kafka.Producer(producer_config)

         同时修改使用的单例元类,实现每个进程内的单例:

class KafkaSingletonClass(type):
    """kafka 专用的单例元类"""
    _instance_lock = threading.Lock()
    _instance = {}

    def __call__(cls, *args, **kwargs):
        pid = mp.current_process().pid
        if pid not in cls._instance:
            with cls._instance_lock:
                if pid not in cls._instance:
                    cls._instance[pid] = super().__call__(*args, **kwargs)
        return cls._instance[pid]

        问题修复只改动了kafka client初始化部分的代码,不需要对其它位置进行调整。

子进程创建中的线程拷贝问题相关链接:

        https://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them

        https://pythonspeed.com/articles/python-multiprocessing/

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐