一. celery概要
    1. 概念
        分布式任务队列,django中所有需要用到异步操作的,或者定时操作的,都可以使用celery
    2. 组件
        Celery Beat : 任务调度器. Beat 进程会读取配置文件的内容, 周期性的将配置中到期需要执行的任务发送给任务队列.
        Celery Worker : 执行任务的消费者, 通常会在多台服务器运行多个消费者, 提高运行效率.
        Broker : 消息代理, 队列本身. 也称为消息中间件.
        Producer : 任务生产者. 调用 Celery API , 函数或者装饰器, 而产生任务并交给任务队列处理的都是任务生产者.
        Result Backend : 任务处理完成之后保存状态信息和结果, 以供查询.
    3. 工作流程
        producer发布任务,传递给broker,borker 传递给 beat,beat 将任务调度给worker,worker执行任务并将结果存入数据库(redis等)

二. celery搭建
    1. 安装redis
        官网下载对应版本的redis tar包
        tar -xf redis-3.2.12.tar.gz
        cd redis-3.2.12
        make
        make install
        ./utils/install_server.sh
    2. 安装celery
        pip3.6 install celery
    3. 使用celery简单测试
        1). 编写tasks.py
            import time
            from celery import Celery
            # 使用redis作为broker(消息队列)
            celery = Celery('tasks', broker='redis://localhost:6379/0')
            # 创建任务函数
            @celery.task
            def sendmail(mail):
                print("sending mail to %s..."%(mail['to']))
                time.sleep(2.0)
                print('mail sent.')

        2). 进入tasks.py目录启动 celery 处理任务
            celery  -A tasks worker --loglevel=info
            说明:celery对tasks任务启动一个worker线程,日志级别为info
        3). 另开命令行进入tasks.py目录编写测试脚本 test.py
            from tasks import sendmail
            import time
            while 1:
              print(sendmail.delay(dict(to='mx_steve@163.com')))
              time.sleep(1)
            调用delay函数,是将任务函数加入到队列中
        可以看到worker线程有对应的处理输出,测试celery完成,目前celery可以使用了,但是还不能放入后台运行,可以使用supervisor进行后台运行管理
    4. supervisor 安装使用
        1). 使用pip安装supervisor
            pip3.6 install supervisor
        2). 生成supervisor配置文件
            echo_supervisord_conf > /etc/supervisord.conf
        3). 修改supervisor配置文件
            vim /etc/supervisord.conf
                # supervisor子配置目录
                [include]
                files = /etc/supervisor/*.conf
                # supervisor web管理界面
                [inet_http_server]
                port=192.168.89.133:9001
                username=user
                password=123
        4). 创建子配置文件目录和子配置文件
            mkdir /etc/supervisor
            vim /etc/supervisor/test.conf
                [program:testpy]
                command=python /opt/celery/test1.py              ; 要执行的程序
                ;process_name=%(program_name)s
                ;numprocs=1
                directory=/opt/celery/              ; 要进入的项目目录(先进入,再执行)
                ;umask=022
                priority=999                             ; 任务要执行的优先级
                startsecs = 5                            ; 启动 5 秒后没有异常退出,就当作已经正常启动了
                autorestart = true                       ; 程序异常退出后自动重启
                startretries = 3                         ; 启动失败自动重试次数,默认是 3
                stdout_logfile = /opt/celery/supervisor.log
        5). 编写测试脚本/opt/celery/test1.py
            import time
            while 1:
              with open('/tmp/bb.txt','a') as fobj:
                fobj.write('111\n')
              time.sleep(1)
        3). 通过配置文件启动supervisor
            supervisord -c /etc/supervisord.conf
        4). 查看运行状态
            supervisorctl status
                testpy                           RUNNING   pid 61435, uptime 0:02:32
            注意报错:gave up: testpy entered FATAL state, too many start retries too quickly
            原因:
                1. 脚本启动比较慢,将配置文件中startsecs调大一些
                2. 脚本运行退出,非死循环状态,也会报这个错 , 这种情况也不算报错,执行完成自然退出,配置文件中设置了重启3次,重启执行完后自然又会退出
            supervisorctl stop testpy 停止任务
            supervisorctl start testpy 启动任务
    5. 使用 supervisor 控制 celery 半个小时更新一次
        1). 将第4步的test1.py脚本更换成第3步的test.py脚本,celery就被supervisor控制
        2). 定义时间,半个小时更新一次脚本
           a. 创建supervisor子配置文件[调度器] /etc/supervisor/get_version_beat.conf
                [program:get_version_beat]
                # 执行命令
                directory=/opt/gits/orange
                command = /root/venv/bin/celery --workdir=/opt/gits/orange -A webhook beat -l info
                # 日志配置
                loglevel = info
                stdout_logfile = /tmp/get_version_beat.log
                stderr_logfile = /tmp/get_version_beat.log
                stdout_logfile_maxbytes = 50MB
                stdout_logfile_backups = 1
                # 给每个进程命名,便于管理
                process_name = get_version_beat%(process_num)s
                # 启动的进程数,设置成云服务器的vCPU数
                numprocs_start = 1
                numprocs = 1
                # 设置自启和重启
                autostart = true
                autorestart = true
                redirect_stderr = True
           b. 创建supervisor子配置文件[消费者] /etc/supervisor/get_version_worker.conf
                [program:get_version_worker]
                # 执行用户
                # user = root
                # 执行的命令
                directory=/opt/gits/orange
                command = /root/venv/bin/celery --workdir=/opt/gits/orange -A webhook worker -l info
                # 日志文件配置
                loglevel = info
                stdout_logfile = /tmp/get_version_worker.log
                stderr_logfile = /tmp/get_version_worker.log
                stdout_logfile_maxbytes = 50MB
                stdout_logfile_backups = 1
                # 给每个进程命名,便于管理
                process_name = get_version_worker%(process_num)s
                # 启动的进程数,设置成云服务器的vCPU数
                numprocs_start = 1
                numprocs = 1
                # 设置自启和重启
                autostart = true
                autorestart = true
                redirect_stderr = True
           c. 到项目根目录配置celery.py文件
                ... ...
                app.conf.update(
                    CELERYBEAT_SCHEDULE = {
                        ... ...
                        'get_version':{
                            'task': 'celery_app.tasks.get_version',
                            'schedule': crontab(minute='1', hour='1', day_of_week='*', day_of_month='*', month_of_year='*'),
                        }
                    }
                }
           d. 进入到celery_app目录,进行功能编写
                vim get_version.py
                    import paramiko
                    import requests
                    import datetime
                    import redis
                    import json
                    import os
                    import subprocess
                    from threading import Timer
                    import logging

                    logger = logging.getLogger('t3logger')

                    class GetVersion():
                        def __init__(self):
                            pass

                        def par_ver(self, host0, app_name):
                            client = paramiko.SSHClient()
                            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                            client.connect(hostname=host0, port=2222, username='dc')
                            stdin, stdout, stderr = client.exec_command("awk '/jfrog/' /data/scripts/deploy_%s.sh | tail -1 | awk '{print $4}' | awk -F/ '{print $4}'" % (app_name))
                            out = stdout.read().decode('utf-8')
                            err = stderr.read().decode('utf-8')
                            if out == '':
                                out = '0'
                            client.close()
                            out = out.strip()
                            return out

                        def chaoshi(self, args, timeout):
                            p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                            timer = Timer(timeout, lambda process: process.kill(), [p])
                            try:
                                timer.start()
                                stdout, stderr = p.communicate()
                                return_code = p.returncode
                                if stdout != b'':
                                    return True
                                else:
                                    return False
                            finally:
                                timer.cancel()

                        def main(self):
                            os.system('rm -f /tmp/hosts_questions.txt')
                            # 此处为数据接口,此功能不赘述
                            all_keys = requests.get("http://10.0.0.1:10080/assets/inventory/--list/None/")
                            all_objs = all_keys.json()
                            i = 1
                            objs = []
                            for item in all_objs:
                                if item == 'all' or item == '_meta':
                                    continue
                                if 't3_data_apps' in item:
                                    env = item.split('_ktz_data_apps_')[0]
                                    center = 'ktz_data_apps'
                                    app_name = item.split('_ktz_data_apps_')[-1]
                                elif 'ktz_m' in item:
                                    env = item.split('_ktz_m_')[0]
                                    center = 'ktz_m'
                                    app_name = item.split('_ktz_m_')[-1]
                                else:
                                    env = item.split('_')[0]
                                    center = item.split('_')[1]
                                    app_name = item.split('_')[-1]
                                hosts = all_objs[item]['hosts']
                                str = ''
                                for host in hosts:
                                    str += host + ','
                                hosts_str = str
                                host0 = all_objs[item]['hosts'][0]
                                result = self.chaoshi(['telnet', host0, '53742'], 2)
                                if result == False:
                                    os.system('echo %s >> /tmp/hosts_questions.txt' % (host0))
                                    continue
                                ver = self.par_ver(host0, app_name)
                                now = datetime.datetime.now().strftime('%Y-%m-%d')
                                objs.append([i, center, app_name, ver, hosts_str, now, env])
                                i += 1
                            red = redis.Redis(host='localhost', port=6379, db=1)
                            objs_json = json.dumps(objs)
                            red.set('versions', objs_json)


                    if __name__ == '__main__':
                        gv = GetVersion()
                        gv.main()
           e. 进入到celery_app目录的tasks.py文件,引入get_version功能
                from celery_app.get_version import GetVersion
                @shared_task
                def get_version():
                    """
                    周期性收集服务器上对应的服务版本信息
                    :return:
                    """
                    info = '周期性收集服务器上对应的服务版本信息'
                    logger.info(info)
                    gv = GetVersion()
                    gv.main()
           f. 启动django服务
           g. 重启supervisord服务
                killall -9 supervisord
                supervisord -c /etc/supervisord.conf
    6. 问题排错
           问题1:查看日志,发现beat启动错误
                tail -10f /tmp/get_version_beat.log
                    celery beat v4.3.0 (rhubarb) is starting.
                    ERROR: Pidfile (celerybeat.pid) already exists.
                排错思路:不用创建beat调度器,因为已经存在了
                    1. 去掉get_version_beat.conf
                        rm -f /etc/supervisor/get_version_beat.conf
                    2. 启动supervisor查看是否还有错误
                        tail -10f /tmp/get_version_worker.log
                            [2019-12-16 17:38:34,897: INFO/MainProcess] Connected to redis://127.0.0.1:6379/8
                            [2019-12-16 17:38:34,903: INFO/MainProcess] mingle: searching for neighbors
                            [2019-12-16 17:38:35,945: WARNING/MainProcess] /root/venv/lib/python3.6/site-packages/celery/app/control.py:54: DuplicateNodenameWarning: Received multiple replies from node name: celery@k8snode01.
                            Please make sure you give each node a unique nodename using
                            the celery worker `-n` option.
                              pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
                            [2019-12-16 17:38:35,946: INFO/MainProcess] mingle: all alone
                            [2019-12-16 17:38:35,954: WARNING/MainProcess] /root/venv/lib/python3.6/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
                              warnings.warn('Using settings.DEBUG leads to a memory leak, never '
                            [2019-12-16 17:38:35,954: INFO/MainProcess] celery@k8snode01 ready.
                        supervisorctl status
                            get_version_worker:get_version_worker1   RUNNING   pid 7598, uptime 0:01:55
                        启动正常,并无错误,等待运行结果结束
                        虽然没有大问题,但是看get_version_worker.log日志,查看报错 UserWarning: Using settings.DEBUG leads to a memory leak,主项目settings配置文件debug改为false即可
           问题2:结果并没有实现,没有创建/tmp/abc.txt测试文件,没有将数据导入到redis库中,不知道原因是啥?
                排错思路:
                    1. 直接运行 get_version.py 没有问题,数据也可以导入到redis库中,说明 get_version.py 脚本没有问题
                    2. 检查 celery_app 目录下 tasks.py 脚本, 引入GetVersion类并实例化调用方法,没有问题
                    3. 'get_version':{ 冒号后面没有空格,重新运行
                        tail -10f /tmp/supervisord.log
                            'get_version_worker1' with pid 103842 显示这个任务已经启动
                        tail -10f /tmp/get_version_worker.log
                        日志都没啥变化,不是这个原因引起的。
                    4. 第二天早上查看,日志神奇的出现了,而且redis也有了对应的键值对,猜测是 celery schedule 的单位问题
                        于是将schedule后面写上3600(秒)
                        重启django服务,更新 supervisorctl 配置
                        supervisorctl update
                        supervisorctl reload (重启supervisord)
                        supervisorctl status
                            get_version_worker:get_version_worker1   RUNNING   pid 32703, uptime 0:02:08
                        启动时间:9:30;测试时间:10:30-40之间
                        结果:10:50后还没有数据。
                        分析:收集信息需要时间,1100个服务,每个花费5s收集,总共需要1100*5/60=91分钟,9:30+91=11:00左右结束,还没有写入,11:20查看一次,
                        建议:先测试,收集少量数据,就不会花费太多时间
                        结果:到下午 13:30 redis 里都没有数据
                        先排除日志报错:debug=false和名称相同的错,配置了启动位置加了 -n get_version_worker
                    5. 查看 supervisorctl 是否可以控制celery
                        使用 gv.main()函数(将字符串传入reids中) 脚本测试
                    6. 第三天早上查看,redis里又存入了versions键值对,在/tmp下也创建了相应的测试文件abc.txt,时间是今天的9:00
                        思考:supervisorctl 里的任务压根没有运行,但是数据还是创建了,可能是celery自己创建的,并不受supervisor控制
                        排错:
                            ss -anptu | grep celery
                            发现有很多celery进程在运行,杀死celery和supervisord进程,删除redis中的键值
                            killall -9 celery
                            killall -9 supervisord
                            redis-cli
                            > select 1
                            > keys *
                                1) "versions"
                            > del versions
                            重新启动django和supervisord
                            supervisord -c /etc/supervisord.conf
                            ss anptu | grep celery ; ss anptu | grep supervisord 查看进程已经存在
                            查看日志也没有什么报错
                            进入redis并没有看到数据存入库中,泪奔
                            /etc/supervisor/下没有配置beat,添加配置,重启supervisord和celery,发现版本信息可以存入到redis库中,踩了好几天的坑,问题终于解决了。
                    问题排错总结:
                        看来第5步/etc/supervisor/get_version_beat.conf这个文件还不能删,要回顾一下celery定时任务原理,如果没有beat,就相当于没有了调度器,没有任务调度器,配置了也不会执行。
三. 总结
    使用任何一项新的技术,一定要将原理搞清楚,弄明白,然后再对照着原理搭建自己的任务,每个环境都不漏下,就很大可能会走通,不会出现什么问题。


Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐