1. Celery的介绍

celery是一个异步任务框架, 执行异步任务(立即), 执行延时任务, 执行定时任务.

celery管方不支持windows, 第三方在windos来发了一些工具可以使用它.

异步任务框架:
    celery可以不依赖任务服务器, 通用自身命令, 启动服务(内部支持socket).
    celery服务是为其他项目服务提供异步解决任务需求的.
    异步的原因就是为了提高项目的并发量, 接收的请求高, 

使用场景:
    异步执行: 解决耗时任务, 将耗时操作任务提交给Celery去异步执行, 比如发送短信/邮件, 消息推送...
    延时任务: 解决延迟任务.
    定时执行: 解决周期任务, 比如每天的数据统计...
    在大型秒杀活动时, 提前将数据库的数据预热到redis中.

2. Celery架构

Cerey架构由三部分组成:

1. 消息中间件(message broker): Celery本身不提供消息服务, 但提供第三方提供的消息中间件集成. 
eg: RabbitMQ消息队列, redis.

2. 任务执行单元(worker) Worker是Cerlery提供的任务执行单元, Worker并发的运行在分布式的系统节点中.
分布式的系统节点中:意思就是可以运行在多台机器上面.	

3. 任务执行结果存储(task result store): 用来存储Worker执行的任务的结果, 
Celery支持不同方式存储任务结果, eg: AMQP消息队列, redis等.

2022-05-26_00902

1. 需要安装celery框架环境, 启动celery服务(配置Broler与Backend)
2. 手动或自动天骄任务到Broler, Workwe会自动执行任务(后台异步)
3. 查看执行结果, 从Backend中获取即可.

3. Celery的使用

安装celery模块
pip install celery==4.4.6
3.1 使用方式1
* 1. 启动worker代码.
     新建test_celery.py
# test_celery.py

# 导入Celery类
from celery import Celery

# 定义任务中间件存放地址, 需要从这个地址中取出任务
broker = 'redis://127.0.0.1:6379/1'  # 使用db1
# 定义任务结果仓库, 任务执之后的结果的保存地址
backend = 'redis://127.0.0.1:6379/2'  # # 使用db2

# redis://... 是redis定义的协议.


# 生成一个app对象 main=__name__, 设置一个名字
app = Celery(__name__, broker=broker, backend=backend)

* 2. 启动worker不能直接启动, 而是通过命令启动.
     非windows: celery worker -A 文件名 -l info 
     -l info 日志级别

     windows 需要下载eventlet 模块
     pip install eventlet 
     celery worker -A 文件名 -l info -P eventlet
启动代码: celery worker -A test_celery -l info -P eventlet
启动之后会hang在这等待任务...

image-20220526193600222

* 3. 创建任务, 任务在test_celery.py中写, 使用@app.task装饰.
# 导入Celery类
from celery import Celery

# 定义任务中间件存放地址, 需要从这个地址中取出任务
broker = 'redis://127.0.0.1:6379/1'  # 使用db1
# 定义任务结果仓库, 任务执之后的结果的保存地址
backend = 'redis://127.0.0.1:6379/2'  # 使用db2

# redis://... 是redis定义的协议.


# 生成一个app对象 main=__name__, 设置一个名字
app = Celery(__name__, broker=broker, backend=backend)


# 创建任务
@app.task
def my_add(x, y):
    print(x, y)  # 会在命令行中展示 
    # [2022-05-26 19:37:44,508: WARNING/MainProcess] 1
    # [2022-05-26 19:37:44,508: WARNING/MainProcess] 2
    # 任务的结果使用return 返回
    return x + y
* 4. 将任务添加到消息中间件
     新建test_task.py
# 导入任务函数
from test_celery import my_add

# my_add(1, 2)  # 这是普通的函数

res = my_add.delay(1, 2)  # 提交任务
print(res)  # 71b04448-5894-4a6c-8d72-fad5cbcf9447  随机字符串
# 返回task_id, 通过这个id去redis中获取结果
* 5. 启动test_task.py文件, 把任务添加到中间件中

image-20220526193820518

image-20220526155357220

* 5. 获取结果
     新建get_result.py
from test_celery import app
# 导入异步结果模块
from celery.result import AsyncResult

# 存储之后返回的task_id
task_id = '8dcfec62-3bba-4fe1-81cf-a88ffbe80b9c'
if __name__ == '__main__':
    # 生的一个对象, 传入task_id 和 worker对象
    async1 = AsyncResult(id=task_id, app=app)
    if async1.successful():
        result = async1.get()
        print(result)  # 3
    elif async1.failed():
        print('任务失败')
    elif async1.status == 'PENDING':
        print('任务等待中被执行')  # id不存在会走这个
    elif async1.status == 'RETRY':
        print('任务异常后正在重试')
    elif async1.status == 'STARTED':
        print('任务已经开始被执行')

3.2 使用方式2
创建一个包, 包下必须有一个叫celery.py的文件, 在这个分文件下写worket任务执行单元代码.
* 1. 创建一个celery_task包
* 2. 创建任务, 每个任务可以创建一个单独的文件
	 新建task1.py文件
# 导入自己写的celery中的worker对象
from .celery import app


# 任务1
@app.task
def my_add(x, y):
    print(x, y) 
    return x + y

* 3. 在celery_task包下创建celery.py(与celery模块重名)
     在celery.py下写worket任务执行单元代码, 并添加任务
# 导入Celery类
from celery import Celery

# 定义任务中间件存放地址, 需要从这个地址中取出任务
broker = 'redis://127.0.0.1:6379/1'  # 使用db1
# 定义任务结果仓库, 任务执之后的结果的保存地址
backend = 'redis://127.0.0.1:6379/2'  # # 使用db2

# redis://... 是redis定义的协议.


# 生成一个app对象 main=__name__, 设置一个名字
# include参数注册任务 include = ['包名.任务', '包名.任务', ...] 通过反射去获取任务
app = Celery(__name__, broker=broker, backend=backend, include=[''])

* 4. 启动worker命令:celery worker -A 包名 -l info -P eventlet
     celery worker -A celery_task -l info -P eventlet
* 5. 使用任务, 在celery_task包的同级目录下新建一个test1.py文件, 在test1.py 中使用任务.
# 导入任务
from celery_task.task1 import my_add

res = my_add.delay(1, 2)
print(res)  # 6aa08746-0eb2-48d5-9937-82d7f131e0fa
* 6. 获取执行结果, 在celery_task包的同级目录下新建一个get_result.py文件
from celery_task.celery import app
from celery.result import AsyncResult

task_id = '6aa08746-0eb2-48d5-9937-82d7f131e0fa'
if __name__ == '__main__':
    async1 = AsyncResult(id=task_id, app=app)
    if async1.successful():
        result = async1.get()
        print(result)  # 3
    elif async1.failed():
        print('任务失败')
    elif async1.status == 'PENDING':
        print('任务等待中被执行')  # id不存在会走这个
    elif async1.status == 'RETRY':
        print('任务异常后正在重试')
    elif async1.status == 'STARTED':
        print('任务已经开始被执行')

4. 延迟任务

为任务设置执行时间(UTC时间), 执行一次就结束.
在celery_task包下新建test3.py文件
# ps 时间模块
# 导入时间模块, 时间增量模块
from datetime import datetime, timedelta

# 获取当前utc时间
now_time = datetime.utcnow()
print(now_time)  # 2022-05-26 09:05:42.693504

# 生成一个10秒的时间增量对象, 这个时间增量对象可以与时间对象相加
time = timedelta(seconds=10)

print(time)  # 0:00:10

eta = now_time + time
print(eta)  # 2022-05-26 09:05:52.693504

# test3.py
# 导入任务
from celery_task.task1 import my_add

# 导入时间模块, 时间增量模块
from datetime import datetime, timedelta

# 获取当前utc时间
# 生成一个10秒的时间增量对象, 这个时间增量对象可以与时间对象相加
eta = datetime.utcnow() + timedelta(seconds=10)

# 应用异步(args给任务函数的参数, eta执行任务的时间)
res = my_add.apply_async(args=(1, 2), eta=eta)
print(res)  # task_id 会立刻拿到, 并结束这个程序

image-20220526171515402

创建延时任务, 任务到了时间立刻触发.

5. 定时任务

循环执行, 设置循环时间.
* 在celery.py文件中设置
# 导入Celery类
from celery import Celery
# 任务的定时配置
from datetime import timedelta

# 定义任务中间件存放地址, 需要从这个地址中取出任务
broker = 'redis://127.0.0.1:6379/1'  # 使用db1
# 定义任务结果仓库, 任务执之后的结果的保存地址
backend = 'redis://127.0.0.1:6379/2'  # # 使用db2

# redis://... 是redis定义的协议.


# 生成一个app对象 main=__name__, 设置一个名字
app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.task1'])

# 时区 亚洲上海  app.conf.timezone 与 app.conf.enable_utc是成对设置的
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

app.conf.beat_schedule = {
    # 定义一个名字
    'task1_my_add': {
        # 任务: 任务地址
        'task': 'celery_task.task1.my_add',
        # 定时
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        # 任务参数
        'args': (1, 2),
    }
}

这个文件不能直接启动, 需要通过命令提示符启动beat.

启动 worker:
命令: celery worker -A celery_task -l info -P eventlet
启动 beat:
命令: celery beat -A celery_task -l info 
beat:

2022-05-26_00910

worker:

image-20220526192709906

执行之后会创建一些缓存文件... 如果启动不了先把缓存文件删除之后再运行.

image-20220526192841322

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐