Flask-定时任务
pip install Flask-APSchedulerFlask-APScheduler:Tips - Flask APSchedulerFlask-APScheduler 是基于 python 库 apscheduler 做的集成, 所以官网上只有一些简单的使用案例,详细的配置还是要看 apscheduler的文档。apscheduler:Advanced Python Scheduler
目录
pip install Flask-APSchedulerFlask-APScheduler: Tips - Flask APScheduler
Flask-APScheduler 是基于 python 第三方库 apscheduler 做的集成, 所以官网上只有一些简单的使用案例,详细的配置还是要看 apscheduler的文档。
apscheduler:
引用:
一、简单使用
__init__.py:
from flask import Flask
app = Flask(__name__)
app.config.from_object('config.config')
# 初始化db
db = SQLAlchemy()
db.init_app(app)
# 初始化定时任务
from models.taskSchedule import scheduler
scheduler.init_app(app)
scheduler.start()
# 修改调度器执行组件冗余日志级别
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
taskSchedule.py:
from flask_apscheduler import APScheduler
scheduler = APScheduler()
# interval example, 间隔执行, 每30秒执行一次
@scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900)
def job1():
print('Job 1 executed')
# cron examples, 每分钟执行一次
@scheduler.task('cron', id='do_job_2', minute='*')
def job2():
print('Job 2 executed')
# 每周执行一次
@scheduler.task('cron', id='do_job_3', week='*', day_of_week='sun')
def job3():
print('Job 3 executed')
也可以在程勋运行后添加定时任务:
scheduler.start()
scheduler.add_job(**args)
二、apscheduler
apscheduler 四个组件:
- triggers: 任务触发器组件,提供任务触发方式
- job stores: 任务商店组件,提供任务保存方式
- executors: 任务调度组件,提供任务调度方式
- schedulers: 任务调度组件,提供任务工作方式
triggers
支持三种任务触发方式
- date:
固定日期触发器,任务只运行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建
| 参数 | 说明 |
| :——————————– | :——————- |
| run_date (datetime 或 str) | 作业的运行日期或时间 |
| timezone (datetime.tzinfo 或 str) | 指定时区 |
例如# 在 2019-4-24 00:00:01 时刻运行一次 start_system 方法
scheduler .add_job(start_system, 'date', run_date='2019-4-24 00:00:01', args=['text'])
- interval:
时间间隔触发器,每个一定时间间隔执行一次。
| 参数 | 说明 |
| —————————- | ———- |
| weeks (int) | 间隔几周 |
| days (int) | 间隔几天 |
| hours (int) | 间隔几小时 |
| minutes (int) | 间隔几分钟 |
| seconds (int) | 间隔多少秒 |
| start_date (datetime 或 str) | 开始日期 |
| end_date (datetime 或 str) | 结束日期 |
# 在 2019-4-24 00:00:00 - 2019-4-24 08:00:00 之间, 每隔两小时执行一次 alarm_job 方法 scheduler .add_job(alarm_job, 'interval', hours=2, start_date='2019-4-24 00:00:00' , end_date='2019-4-24 08:00:00')
- cron:
cron风格的任务触发
参数 | 说明 | |
---|---|---|
year (int 或 str) | 表示四位数的年份 (2019) | |
month(int\ | str) | 月 (范围1-12) |
day(int\ | str) | 日 (范围1-31) |
week(int\ | str) | 周 (范围1-53) |
day_of_week (int\ | str) | 表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示 |
hour (int\ | str) | 表示取值范围为0-23时 |
minute (int\ | str) | 表示取值范围为0-59分 |
second (int\ | str) | 表示取值范围为0-59秒 |
start_date (datetime\ | str) | 表示开始时间 |
end_date (datetime\ | str) | 表示结束时间 |
timezone (datetime.tzinfo\ | str) | 表示时区取值 |
(int
|str
) 表示参数既可以是int
类型,也可以是str
类型
(datetime | str
) 表示参数既可以是datetime类型,也可以是str
类型
例如:表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5
sched.add_job(my_job, 'cron',second = '*/5')
job stores
支持四种任务存储方式
- memory:默认配置任务存在内存中
- mongdb:支持文档数据库存储
- sqlalchemy:支持关系数据库存储
- redis:支持键值对数据库存储
schedulers
调度器主要分三种,一种独立运行的,一种是后台运行的,最后一种是配合其它程序使用
- BlockingScheduler: 当这个调度器是你应用中
唯一要运行
的东西时使用 - BackgroundScheduler: 当
不运行其它框架
的时候使用,并使你的任务在后台运行
- AsyncIOScheduler: 当你的程序是
异步IO模型
的时候使用 - GeventScheduler: 和
gevent
框架配套使用 - TornadoScheduler: 和
tornado
框架配套使用 - TwistedScheduler: 和
Twisted
框架配套使用 - QtScheduler: 开发
qt
应用的时候使用
Flask-APScheduler 中默认使用的就是 BackgroundScheduler:
scheduler.py:
#.....
class APScheduler(object):
"""Provides a scheduler integrated to Flask."""
def __init__(self, scheduler=None, app=None):
self._scheduler = scheduler or BackgroundScheduler()
self._host_name = socket.gethostname().lower()
self._authentication_callback = None
# .....
使用上下文
如果正在使用 Flask-SQLAlchemy 并在定时任务中执行数据库操作,需要提供 Flask 应用程序上下文:
from flask_apscheduler import APScheduler
scheduler = APScheduler()
@scheduler.task(
"interval",
id="update_news_graph_job",
minutes = 16
)
def update_news_graph():
# 提供flask上下文对象
with scheduler.app.app_context():
result = db.session.execute(query_sql)
当然, scheduler 需要在flask中注册:
scheduler.init_app(app)
scheduler.start()
官网关于上下文的解释:
Introduction into Contexts — Flask-SQLAlchemy Documentation (2.x)
日志设置
如果定时任务执行间隔几秒钟, 调度程序的日志会很多,可以设置调度程序日志级别或完全禁用:
#设置调度程序的日志级别, 原本级别为info
scheduler.start()
scheduler.add_job(every_minute, trigger='cron', second=0, id='every_minute')
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
#或者禁用调度程序日志
logging.getLogger('apscheduler.executors.default').propagate = False
更多推荐
所有评论(0)