celery框架的使用
1. Celery的介绍celery是一个异步任务框架, 执行异步任务(立即), 执行延时任务, 执行定时任务.celery管方不支持windows, 第三方在windos来发了一些工具可以使用它.异步任务框架:celery可以不依赖任务服务器, 通用自身命令, 启动服务(内部支持socket).celery服务是为其他项目服务提供异步解决任务需求的.异步的原因就是为了提高项目的并发量, 接收的请
·
1. Celery的介绍
celery是一个异步任务框架, 执行异步任务(立即), 执行延时任务, 执行定时任务.
celery管方不支持windows, 第三方在windos来发了一些工具可以使用它.
异步任务框架:
celery可以不依赖任务服务器, 通用自身命令, 启动服务(内部支持socket).
celery服务是为其他项目服务提供异步解决任务需求的.
异步的原因就是为了提高项目的并发量, 接收的请求高,
使用场景:
异步执行: 解决耗时任务, 将耗时操作任务提交给Celery去异步执行, 比如发送短信/邮件, 消息推送...
延时任务: 解决延迟任务.
定时执行: 解决周期任务, 比如每天的数据统计...
在大型秒杀活动时, 提前将数据库的数据预热到redis中.
2. Celery架构
Cerey架构由三部分组成:
1. 消息中间件(message broker): Celery本身不提供消息服务, 但提供第三方提供的消息中间件集成.
eg: RabbitMQ消息队列, redis.
2. 任务执行单元(worker) Worker是Cerlery提供的任务执行单元, Worker并发的运行在分布式的系统节点中.
分布式的系统节点中:意思就是可以运行在多台机器上面.
3. 任务执行结果存储(task result store): 用来存储Worker执行的任务的结果,
Celery支持不同方式存储任务结果, eg: AMQP消息队列, redis等.
1. 需要安装celery框架环境, 启动celery服务(配置Broler与Backend)
2. 手动或自动天骄任务到Broler, Workwe会自动执行任务(后台异步)
3. 查看执行结果, 从Backend中获取即可.
3. Celery的使用
安装celery模块
pip install celery==4.4.6
3.1 使用方式1
* 1. 启动worker代码.
新建test_celery.py
# test_celery.py
# 导入Celery类
from celery import Celery
# 定义任务中间件存放地址, 需要从这个地址中取出任务
broker = 'redis://127.0.0.1:6379/1' # 使用db1
# 定义任务结果仓库, 任务执之后的结果的保存地址
backend = 'redis://127.0.0.1:6379/2' # # 使用db2
# redis://... 是redis定义的协议.
# 生成一个app对象 main=__name__, 设置一个名字
app = Celery(__name__, broker=broker, backend=backend)
* 2. 启动worker不能直接启动, 而是通过命令启动.
非windows: celery worker -A 文件名 -l info
-l info 日志级别
windows 需要下载eventlet 模块
pip install eventlet
celery worker -A 文件名 -l info -P eventlet
启动代码: celery worker -A test_celery -l info -P eventlet
启动之后会hang在这等待任务...
* 3. 创建任务, 任务在test_celery.py中写, 使用@app.task装饰.
# 导入Celery类
from celery import Celery
# 定义任务中间件存放地址, 需要从这个地址中取出任务
broker = 'redis://127.0.0.1:6379/1' # 使用db1
# 定义任务结果仓库, 任务执之后的结果的保存地址
backend = 'redis://127.0.0.1:6379/2' # 使用db2
# redis://... 是redis定义的协议.
# 生成一个app对象 main=__name__, 设置一个名字
app = Celery(__name__, broker=broker, backend=backend)
# 创建任务
@app.task
def my_add(x, y):
print(x, y) # 会在命令行中展示
# [2022-05-26 19:37:44,508: WARNING/MainProcess] 1
# [2022-05-26 19:37:44,508: WARNING/MainProcess] 2
# 任务的结果使用return 返回
return x + y
* 4. 将任务添加到消息中间件
新建test_task.py
# 导入任务函数
from test_celery import my_add
# my_add(1, 2) # 这是普通的函数
res = my_add.delay(1, 2) # 提交任务
print(res) # 71b04448-5894-4a6c-8d72-fad5cbcf9447 随机字符串
# 返回task_id, 通过这个id去redis中获取结果
* 5. 启动test_task.py文件, 把任务添加到中间件中
* 5. 获取结果
新建get_result.py
from test_celery import app
# 导入异步结果模块
from celery.result import AsyncResult
# 存储之后返回的task_id
task_id = '8dcfec62-3bba-4fe1-81cf-a88ffbe80b9c'
if __name__ == '__main__':
# 生的一个对象, 传入task_id 和 worker对象
async1 = AsyncResult(id=task_id, app=app)
if async1.successful():
result = async1.get()
print(result) # 3
elif async1.failed():
print('任务失败')
elif async1.status == 'PENDING':
print('任务等待中被执行') # id不存在会走这个
elif async1.status == 'RETRY':
print('任务异常后正在重试')
elif async1.status == 'STARTED':
print('任务已经开始被执行')
3.2 使用方式2
创建一个包, 包下必须有一个叫celery.py的文件, 在这个分文件下写worket任务执行单元代码.
* 1. 创建一个celery_task包
* 2. 创建任务, 每个任务可以创建一个单独的文件
新建task1.py文件
# 导入自己写的celery中的worker对象
from .celery import app
# 任务1
@app.task
def my_add(x, y):
print(x, y)
return x + y
* 3. 在celery_task包下创建celery.py(与celery模块重名)
在celery.py下写worket任务执行单元代码, 并添加任务
# 导入Celery类
from celery import Celery
# 定义任务中间件存放地址, 需要从这个地址中取出任务
broker = 'redis://127.0.0.1:6379/1' # 使用db1
# 定义任务结果仓库, 任务执之后的结果的保存地址
backend = 'redis://127.0.0.1:6379/2' # # 使用db2
# redis://... 是redis定义的协议.
# 生成一个app对象 main=__name__, 设置一个名字
# include参数注册任务 include = ['包名.任务', '包名.任务', ...] 通过反射去获取任务
app = Celery(__name__, broker=broker, backend=backend, include=[''])
* 4. 启动worker命令:celery worker -A 包名 -l info -P eventlet
celery worker -A celery_task -l info -P eventlet
* 5. 使用任务, 在celery_task包的同级目录下新建一个test1.py文件, 在test1.py 中使用任务.
# 导入任务
from celery_task.task1 import my_add
res = my_add.delay(1, 2)
print(res) # 6aa08746-0eb2-48d5-9937-82d7f131e0fa
* 6. 获取执行结果, 在celery_task包的同级目录下新建一个get_result.py文件
from celery_task.celery import app
from celery.result import AsyncResult
task_id = '6aa08746-0eb2-48d5-9937-82d7f131e0fa'
if __name__ == '__main__':
async1 = AsyncResult(id=task_id, app=app)
if async1.successful():
result = async1.get()
print(result) # 3
elif async1.failed():
print('任务失败')
elif async1.status == 'PENDING':
print('任务等待中被执行') # id不存在会走这个
elif async1.status == 'RETRY':
print('任务异常后正在重试')
elif async1.status == 'STARTED':
print('任务已经开始被执行')
4. 延迟任务
为任务设置执行时间(UTC时间), 执行一次就结束.
在celery_task包下新建test3.py文件
# ps 时间模块
# 导入时间模块, 时间增量模块
from datetime import datetime, timedelta
# 获取当前utc时间
now_time = datetime.utcnow()
print(now_time) # 2022-05-26 09:05:42.693504
# 生成一个10秒的时间增量对象, 这个时间增量对象可以与时间对象相加
time = timedelta(seconds=10)
print(time) # 0:00:10
eta = now_time + time
print(eta) # 2022-05-26 09:05:52.693504
# test3.py
# 导入任务
from celery_task.task1 import my_add
# 导入时间模块, 时间增量模块
from datetime import datetime, timedelta
# 获取当前utc时间
# 生成一个10秒的时间增量对象, 这个时间增量对象可以与时间对象相加
eta = datetime.utcnow() + timedelta(seconds=10)
# 应用异步(args给任务函数的参数, eta执行任务的时间)
res = my_add.apply_async(args=(1, 2), eta=eta)
print(res) # task_id 会立刻拿到, 并结束这个程序
创建延时任务, 任务到了时间立刻触发.
5. 定时任务
循环执行, 设置循环时间.
* 在celery.py文件中设置
# 导入Celery类
from celery import Celery
# 任务的定时配置
from datetime import timedelta
# 定义任务中间件存放地址, 需要从这个地址中取出任务
broker = 'redis://127.0.0.1:6379/1' # 使用db1
# 定义任务结果仓库, 任务执之后的结果的保存地址
backend = 'redis://127.0.0.1:6379/2' # # 使用db2
# redis://... 是redis定义的协议.
# 生成一个app对象 main=__name__, 设置一个名字
app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.task1'])
# 时区 亚洲上海 app.conf.timezone 与 app.conf.enable_utc是成对设置的
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
app.conf.beat_schedule = {
# 定义一个名字
'task1_my_add': {
# 任务: 任务地址
'task': 'celery_task.task1.my_add',
# 定时
'schedule': timedelta(seconds=3),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
# 任务参数
'args': (1, 2),
}
}
这个文件不能直接启动, 需要通过命令提示符启动beat.
启动 worker:
命令: celery worker -A celery_task -l info -P eventlet
启动 beat:
命令: celery beat -A celery_task -l info
beat:
worker:
执行之后会创建一些缓存文件... 如果启动不了先把缓存文件删除之后再运行.
更多推荐
已为社区贡献4条内容
所有评论(0)