云计算之celery+django+supervisor实现定时任务
一. celery概要1. 概念分布式任务队列,django中所有需要用到异步操作的,或者定时操作的,都可以使用celery2. 组件Celery Beat : 任务调度器. Beat 进程会读取配置文件的内容, 周期性的将配置中到期需要执行的任务发送给任务队列.Celery Worker : 执行任务的消费者, 通常会在...
·
一. celery概要
1. 概念
分布式任务队列,django中所有需要用到异步操作的,或者定时操作的,都可以使用celery
2. 组件
Celery Beat : 任务调度器. Beat 进程会读取配置文件的内容, 周期性的将配置中到期需要执行的任务发送给任务队列.
Celery Worker : 执行任务的消费者, 通常会在多台服务器运行多个消费者, 提高运行效率.
Broker : 消息代理, 队列本身. 也称为消息中间件.
Producer : 任务生产者. 调用 Celery API , 函数或者装饰器, 而产生任务并交给任务队列处理的都是任务生产者.
Result Backend : 任务处理完成之后保存状态信息和结果, 以供查询.
3. 工作流程
producer发布任务,传递给broker,borker 传递给 beat,beat 将任务调度给worker,worker执行任务并将结果存入数据库(redis等)
二. celery搭建
1. 安装redis
官网下载对应版本的redis tar包
tar -xf redis-3.2.12.tar.gz
cd redis-3.2.12
make
make install
./utils/install_server.sh
2. 安装celery
pip3.6 install celery
3. 使用celery简单测试
1). 编写tasks.py
import time
from celery import Celery
# 使用redis作为broker(消息队列)
celery = Celery('tasks', broker='redis://localhost:6379/0')
# 创建任务函数
@celery.task
def sendmail(mail):
print("sending mail to %s..."%(mail['to']))
time.sleep(2.0)
print('mail sent.')
2). 进入tasks.py目录启动 celery 处理任务
celery -A tasks worker --loglevel=info
说明:celery对tasks任务启动一个worker线程,日志级别为info
3). 另开命令行进入tasks.py目录编写测试脚本 test.py
from tasks import sendmail
import time
while 1:
print(sendmail.delay(dict(to='mx_steve@163.com')))
time.sleep(1)
调用delay函数,是将任务函数加入到队列中
可以看到worker线程有对应的处理输出,测试celery完成,目前celery可以使用了,但是还不能放入后台运行,可以使用supervisor进行后台运行管理
4. supervisor 安装使用
1). 使用pip安装supervisor
pip3.6 install supervisor
2). 生成supervisor配置文件
echo_supervisord_conf > /etc/supervisord.conf
3). 修改supervisor配置文件
vim /etc/supervisord.conf
# supervisor子配置目录
[include]
files = /etc/supervisor/*.conf
# supervisor web管理界面
[inet_http_server]
port=192.168.89.133:9001
username=user
password=123
4). 创建子配置文件目录和子配置文件
mkdir /etc/supervisor
vim /etc/supervisor/test.conf
[program:testpy]
command=python /opt/celery/test1.py ; 要执行的程序
;process_name=%(program_name)s
;numprocs=1
directory=/opt/celery/ ; 要进入的项目目录(先进入,再执行)
;umask=022
priority=999 ; 任务要执行的优先级
startsecs = 5 ; 启动 5 秒后没有异常退出,就当作已经正常启动了
autorestart = true ; 程序异常退出后自动重启
startretries = 3 ; 启动失败自动重试次数,默认是 3
stdout_logfile = /opt/celery/supervisor.log
5). 编写测试脚本/opt/celery/test1.py
import time
while 1:
with open('/tmp/bb.txt','a') as fobj:
fobj.write('111\n')
time.sleep(1)
3). 通过配置文件启动supervisor
supervisord -c /etc/supervisord.conf
4). 查看运行状态
supervisorctl status
testpy RUNNING pid 61435, uptime 0:02:32
注意报错:gave up: testpy entered FATAL state, too many start retries too quickly
原因:
1. 脚本启动比较慢,将配置文件中startsecs调大一些
2. 脚本运行退出,非死循环状态,也会报这个错 , 这种情况也不算报错,执行完成自然退出,配置文件中设置了重启3次,重启执行完后自然又会退出
supervisorctl stop testpy 停止任务
supervisorctl start testpy 启动任务
5. 使用 supervisor 控制 celery 半个小时更新一次
1). 将第4步的test1.py脚本更换成第3步的test.py脚本,celery就被supervisor控制
2). 定义时间,半个小时更新一次脚本
a. 创建supervisor子配置文件[调度器] /etc/supervisor/get_version_beat.conf
[program:get_version_beat]
# 执行命令
directory=/opt/gits/orange
command = /root/venv/bin/celery --workdir=/opt/gits/orange -A webhook beat -l info
# 日志配置
loglevel = info
stdout_logfile = /tmp/get_version_beat.log
stderr_logfile = /tmp/get_version_beat.log
stdout_logfile_maxbytes = 50MB
stdout_logfile_backups = 1
# 给每个进程命名,便于管理
process_name = get_version_beat%(process_num)s
# 启动的进程数,设置成云服务器的vCPU数
numprocs_start = 1
numprocs = 1
# 设置自启和重启
autostart = true
autorestart = true
redirect_stderr = True
b. 创建supervisor子配置文件[消费者] /etc/supervisor/get_version_worker.conf
[program:get_version_worker]
# 执行用户
# user = root
# 执行的命令
directory=/opt/gits/orange
command = /root/venv/bin/celery --workdir=/opt/gits/orange -A webhook worker -l info
# 日志文件配置
loglevel = info
stdout_logfile = /tmp/get_version_worker.log
stderr_logfile = /tmp/get_version_worker.log
stdout_logfile_maxbytes = 50MB
stdout_logfile_backups = 1
# 给每个进程命名,便于管理
process_name = get_version_worker%(process_num)s
# 启动的进程数,设置成云服务器的vCPU数
numprocs_start = 1
numprocs = 1
# 设置自启和重启
autostart = true
autorestart = true
redirect_stderr = True
c. 到项目根目录配置celery.py文件
... ...
app.conf.update(
CELERYBEAT_SCHEDULE = {
... ...
'get_version':{
'task': 'celery_app.tasks.get_version',
'schedule': crontab(minute='1', hour='1', day_of_week='*', day_of_month='*', month_of_year='*'),
}
}
}
d. 进入到celery_app目录,进行功能编写
vim get_version.py
import paramiko
import requests
import datetime
import redis
import json
import os
import subprocess
from threading import Timer
import logging
logger = logging.getLogger('t3logger')
class GetVersion():
def __init__(self):
pass
def par_ver(self, host0, app_name):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=host0, port=2222, username='dc')
stdin, stdout, stderr = client.exec_command("awk '/jfrog/' /data/scripts/deploy_%s.sh | tail -1 | awk '{print $4}' | awk -F/ '{print $4}'" % (app_name))
out = stdout.read().decode('utf-8')
err = stderr.read().decode('utf-8')
if out == '':
out = '0'
client.close()
out = out.strip()
return out
def chaoshi(self, args, timeout):
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
timer = Timer(timeout, lambda process: process.kill(), [p])
try:
timer.start()
stdout, stderr = p.communicate()
return_code = p.returncode
if stdout != b'':
return True
else:
return False
finally:
timer.cancel()
def main(self):
os.system('rm -f /tmp/hosts_questions.txt')
# 此处为数据接口,此功能不赘述
all_keys = requests.get("http://10.0.0.1:10080/assets/inventory/--list/None/")
all_objs = all_keys.json()
i = 1
objs = []
for item in all_objs:
if item == 'all' or item == '_meta':
continue
if 't3_data_apps' in item:
env = item.split('_ktz_data_apps_')[0]
center = 'ktz_data_apps'
app_name = item.split('_ktz_data_apps_')[-1]
elif 'ktz_m' in item:
env = item.split('_ktz_m_')[0]
center = 'ktz_m'
app_name = item.split('_ktz_m_')[-1]
else:
env = item.split('_')[0]
center = item.split('_')[1]
app_name = item.split('_')[-1]
hosts = all_objs[item]['hosts']
str = ''
for host in hosts:
str += host + ','
hosts_str = str
host0 = all_objs[item]['hosts'][0]
result = self.chaoshi(['telnet', host0, '53742'], 2)
if result == False:
os.system('echo %s >> /tmp/hosts_questions.txt' % (host0))
continue
ver = self.par_ver(host0, app_name)
now = datetime.datetime.now().strftime('%Y-%m-%d')
objs.append([i, center, app_name, ver, hosts_str, now, env])
i += 1
red = redis.Redis(host='localhost', port=6379, db=1)
objs_json = json.dumps(objs)
red.set('versions', objs_json)
if __name__ == '__main__':
gv = GetVersion()
gv.main()
e. 进入到celery_app目录的tasks.py文件,引入get_version功能
from celery_app.get_version import GetVersion
@shared_task
def get_version():
"""
周期性收集服务器上对应的服务版本信息
:return:
"""
info = '周期性收集服务器上对应的服务版本信息'
logger.info(info)
gv = GetVersion()
gv.main()
f. 启动django服务
g. 重启supervisord服务
killall -9 supervisord
supervisord -c /etc/supervisord.conf
6. 问题排错
问题1:查看日志,发现beat启动错误
tail -10f /tmp/get_version_beat.log
celery beat v4.3.0 (rhubarb) is starting.
ERROR: Pidfile (celerybeat.pid) already exists.
排错思路:不用创建beat调度器,因为已经存在了
1. 去掉get_version_beat.conf
rm -f /etc/supervisor/get_version_beat.conf
2. 启动supervisor查看是否还有错误
tail -10f /tmp/get_version_worker.log
[2019-12-16 17:38:34,897: INFO/MainProcess] Connected to redis://127.0.0.1:6379/8
[2019-12-16 17:38:34,903: INFO/MainProcess] mingle: searching for neighbors
[2019-12-16 17:38:35,945: WARNING/MainProcess] /root/venv/lib/python3.6/site-packages/celery/app/control.py:54: DuplicateNodenameWarning: Received multiple replies from node name: celery@k8snode01.
Please make sure you give each node a unique nodename using
the celery worker `-n` option.
pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
[2019-12-16 17:38:35,946: INFO/MainProcess] mingle: all alone
[2019-12-16 17:38:35,954: WARNING/MainProcess] /root/venv/lib/python3.6/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2019-12-16 17:38:35,954: INFO/MainProcess] celery@k8snode01 ready.
supervisorctl status
get_version_worker:get_version_worker1 RUNNING pid 7598, uptime 0:01:55
启动正常,并无错误,等待运行结果结束
虽然没有大问题,但是看get_version_worker.log日志,查看报错 UserWarning: Using settings.DEBUG leads to a memory leak,主项目settings配置文件debug改为false即可
问题2:结果并没有实现,没有创建/tmp/abc.txt测试文件,没有将数据导入到redis库中,不知道原因是啥?
排错思路:
1. 直接运行 get_version.py 没有问题,数据也可以导入到redis库中,说明 get_version.py 脚本没有问题
2. 检查 celery_app 目录下 tasks.py 脚本, 引入GetVersion类并实例化调用方法,没有问题
3. 'get_version':{ 冒号后面没有空格,重新运行
tail -10f /tmp/supervisord.log
'get_version_worker1' with pid 103842 显示这个任务已经启动
tail -10f /tmp/get_version_worker.log
日志都没啥变化,不是这个原因引起的。
4. 第二天早上查看,日志神奇的出现了,而且redis也有了对应的键值对,猜测是 celery schedule 的单位问题
于是将schedule后面写上3600(秒)
重启django服务,更新 supervisorctl 配置
supervisorctl update
supervisorctl reload (重启supervisord)
supervisorctl status
get_version_worker:get_version_worker1 RUNNING pid 32703, uptime 0:02:08
启动时间:9:30;测试时间:10:30-40之间
结果:10:50后还没有数据。
分析:收集信息需要时间,1100个服务,每个花费5s收集,总共需要1100*5/60=91分钟,9:30+91=11:00左右结束,还没有写入,11:20查看一次,
建议:先测试,收集少量数据,就不会花费太多时间
结果:到下午 13:30 redis 里都没有数据
先排除日志报错:debug=false和名称相同的错,配置了启动位置加了 -n get_version_worker
5. 查看 supervisorctl 是否可以控制celery
使用 gv.main()函数(将字符串传入reids中) 脚本测试
6. 第三天早上查看,redis里又存入了versions键值对,在/tmp下也创建了相应的测试文件abc.txt,时间是今天的9:00
思考:supervisorctl 里的任务压根没有运行,但是数据还是创建了,可能是celery自己创建的,并不受supervisor控制
排错:
ss -anptu | grep celery
发现有很多celery进程在运行,杀死celery和supervisord进程,删除redis中的键值
killall -9 celery
killall -9 supervisord
redis-cli
> select 1
> keys *
1) "versions"
> del versions
重新启动django和supervisord
supervisord -c /etc/supervisord.conf
ss anptu | grep celery ; ss anptu | grep supervisord 查看进程已经存在
查看日志也没有什么报错
进入redis并没有看到数据存入库中,泪奔
/etc/supervisor/下没有配置beat,添加配置,重启supervisord和celery,发现版本信息可以存入到redis库中,踩了好几天的坑,问题终于解决了。
问题排错总结:
看来第5步/etc/supervisor/get_version_beat.conf这个文件还不能删,要回顾一下celery定时任务原理,如果没有beat,就相当于没有了调度器,没有任务调度器,配置了也不会执行。
三. 总结
使用任何一项新的技术,一定要将原理搞清楚,弄明白,然后再对照着原理搭建自己的任务,每个环境都不漏下,就很大可能会走通,不会出现什么问题。
更多推荐
已为社区贡献13条内容
所有评论(0)