下方这个是airflow官方中文文档共大家参考:

https://airflow.apachecn.org/#/zh/start

首先先创建airflow的虚拟环境,创建虚拟环境在我上一篇文章大家可以参考。

文章链接:

https://blog.csdn.net/weixin_45602900/article/details/123821849

进入虚拟环境之后安装airflow

pip install apache-airflow

之后配置存放airflow配置的文件夹

vim ~/.bashrc
source ~/.bashrc
我这里是放在 /home/airflow
所以环境变量配置为:
在这里插入图片描述

export AIRFLOW_HOME=/home/airflow

运行下airflow
在虚拟环境中输入 airflow
在这里插入图片描述

Airflow 文件配置

vim /home/airflow/airflow.cfg

修改文件配置

# dags存放路径
dags_folder = /home/airflow/dags
# 这里我换成了 
dags_folder = /home/job/ymgn/airflow_dag/dags


# 时区默认位utc时间建议换成国内Asia/Shanghai
default_timezone = utc
# 这里我换成
default_timezone = Asia/Shanghai


# airflow支持并行性的工作器,有`SequentialExecutor`(默认,顺序执行), `LocalExecutor`(本地执行), `CeleryExecutor`(远程执行), `DaskExecutor`
executor = SequentialExecutor
# 这里我换成
executor = LocalExecutor


# 数据库连接设置
sql_alchemy_conn = sqlite:////home/airflow/airflow.db
# 这里我换成
# mysql+pymysql://用户名:密码@主机名:端口号/数据库
sql_alchemy_conn = mysql+pymysql://airflow:airflow@localhost:3306/airflow
# 如果不设置数据库将会报错
# 因为这里设置了数据库 所以就得在你的ubuntu上安装mysql
# sudo apt update
# sudo apt install mysql-server
# 安装完之后创建用户
# 首先进入mysql环境
# 在服务器上输入 mysql
"""
创建airflow数据库 并指定字符集
create database if not exists airflow default charset utf8 collate utf8_general_ci;

创建用户
CREATE USER 'airflow'@'%' IDENTIFIED BY 'airflow';

# 给用户授权
grant all privileges on airflow.* to airflow@localhost identified by 'airflow';


但是你airflwo db init 初始化的情况下会出现问题:
”    raise Exception("Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql")
Exception: Global variable explicit_defaults_for_timestamp needs to be on (1) for mysq
mysq的全局变量explicit_defaults_for_timestamp需要为on (1) 
“

所以这里你就需要将 explicit_defaults_for_timestamp设置下:
set global explicit_defaults_for_timestamp =1;
"""

# 数据库编码方式
sql_engine_encoding = utf-8


# 是否与SqlAlchemy库进行数据交互
sql_alchemy_pool_enabled = True
# 最大数据库连接数
sql_alchemy_pool_size = 5
# 控制每个Airflow worker可以同时运行task实例的数量
parallelism = 32
# 用来控制每个dag运行过程中最大可同时运行的task实例数,若DAG中没有设置concurrency,则使用默认值
dag_concurrency = 16
# 创建新的DAG时,是否暂停
dags_are_paused_at_creation = True
# 同一时间最大运行dag的数量,默认为16
max_active_runs_per_dag = 16
# 这里我换成
max_active_runs_per_dag = 1



# 加载示例dags,默认为True
load_examples = True
# 这里我换成
load_examples = False

# 任务清理时间
killed_task_cleanup_time = 60
# 这里我换成
killed_task_cleanup_time = 120


# 日志存放路径
base_log_folder = /home/xxx/airflow/logs

# 日志级别
fab_logging_level = WARN
# 这里我换成
fab_logging_level = WARNING

# web ui面使用的时区
default_ui_timezone = UTC
# 这里我换成
default_ui_timezone = Asia/Shanghai

# 运行web服务端口号
web_server_port = 8080
# 这里我换成
web_server_port = 80

# 超时时间
web_server_master_timeout = 120
web_server_worker_timeout = 120
# 这里我还换成
web_server_master_timeout = 300
web_server_worker_timeout = 300

# 刷新时间
worker_refresh_interval = 6000
# 这里我换成 
worker_refresh_interval = 30


# 任务重试时是否发送邮件提醒  
default_email_on_retry = True
# 这里我换成了
default_email_on_retry = False

# 任务失败时是否发送邮件提醒  
default_email_on_failure = True

# 设置web端Configuration不显示配置信息
expose_config = False
# 加载Airflow UI界面的时间
default_dag_run_display_number = 15


[smtp] 发送邮件时邮箱的配置
smtp_host = localhost
# 这里我用的163邮箱 所以我换成
smtp_host = smtp.163.com

# 加密通讯
smtp_starttls = True
# 这里我换成
smtp_starttls = False

smtp_ssl = False
# Example: smtp_user = airflow
smtp_user = 你的邮箱
# Example: smtp_password = airflow
smtp_password =你邮箱设置里smtp密码

smtp_port = 25
# 这里我设置
smtp_port = 465

smtp_mail_from = 你要收信的邮箱
smtp_timeout = 30
smtp_retry_limit = 5




#  配置celery的broker_url(存储要执行的命令然后celery的worker去消费)
broker_url = redis://redis:6379/0
# 配置celery的result_backend(存储任务执行状态)、 也可以用redis存储
result_backend = db+postgresql://postgres:airflow@postgres/airflow

# 调度程序尝试触发新任务的时间
scheduler_heartbeat_sec = 60
# 检测新dag的时间
min_file_process_interval = 10
# 是否使用catchup功能, 即是否执行自上次execute_date以来所有未执行的DAG Run, 另外定义每个DAG对象可传递catchup参数进行覆盖
catchup_by_default = True


初始化airflow数据库

airflow db init

最后一行出现Initialization done表示安装成功

创建airflow用户

airflow users create --username 账号 --firstname 姓 --lastname 名字 --role Admin --email asd@xxx.com

输入密码和重复密码就完成创建账户了
在这里插入图片描述

创建测试文件

在我的api文件夹里创建测试文件 hhh.py
在这里插入图片描述
文件内容如下:


def testss():
    print('Hello world')

因为前边修改配置文件的时候 我将dag文件目录放在/home/job/ymgn/airflow_dag/dags下方
所以我在下方创建 test.py文件
在这里插入图片描述

test.py文件内容如下:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.settings import TIMEZONE
import datetime as dt
import json
import os
import os
import sys
import pendulum

sys.path.append('/home/job/ymgn/Api')

from hhh import testss

args = {
    'owner': 'hsh',  # DAG的所有者,会在Web UI上显示,主要用于方便管理
    'depends_on_past': False,  # 是否依赖于过去。如果为True,那么必须要昨天的DAG执行成功了,今天的DAG才能执行。
    'start_date': pendulum.datetime(2022, 4, 19, tz="UTC"),  # DAG的开始时间
    'email': ['hhhhesihan@163.com'],  # 出问题时,发送报警Email的地址,可以填多个,用逗号隔开。
    'email_on_failure': True,  # 任务失败且重试次数用完时是否发送Email。
    'email_on_retry': False,  # 任务重试时是否发送Email
    'retries': 2,  # 任务失败后的重试次数
    'retry_delay': dt.timedelta(seconds=1),  # 重试间隔,必须是timedelta对象。

}

with DAG(
        dag_id='testss',  # 任务唯一Id
        description='测试',  # 描述
        default_args=args,  # 上方定义的args
        schedule_interval='@hourly',  # 任务调度间隔
        catchup=True,  # 执行DAG时,将开始时间到目前所有该执行的任务都执行
        tags=['Orders', 'Goods']  # 标签
) as dag:
    my_testss = PythonOperator(
        task_id='get_periods',
        python_callable=testss
    )

    my_testss

启动定时器

nohup airflow scheduler &

之后启动web页面(在后端挂起)

nohup airflow webserver -p 5568 >/home/aw.log 2>&1 &

这样就算部署成功了

下面是UI界面了

在这里插入图片描述

输入前面创建的账号

登录之后的样子:
在这里插入图片描述

启动定时任务点开这个按钮

在运行了

在这里插入图片描述

下方就是输出的内容了

在这里插入图片描述

下方是我包的版本 供大家使用

alembic==1.7.7
anyio==3.5.0
apache-airflow==2.2.5
apache-airflow-providers-ftp==2.1.0
apache-airflow-providers-http==2.1.0
apache-airflow-providers-imap==2.2.1
apache-airflow-providers-sqlite==2.1.1
apispec==3.3.2
argcomplete==2.0.0
async-generator==1.10
attrs==20.3.0
Babel==2.9.1
blinker==1.4
cached-property==1.5.2
cachelib==0.6.0
cattrs==1.0.0
certifi==2021.10.8
cffi==1.15.0
charset-normalizer==2.0.12
click==7.1.2
clickclick==20.10.2
colorama==0.4.4
colorlog==6.6.0
commonmark==0.9.1
connexion==2.13.0
contextvars==2.4
croniter==1.3.4
cryptography==36.0.2
dataclasses==0.8
defusedxml==0.7.1
Deprecated==1.2.13
dill==0.3.4
dnspython==2.2.1
docutils==0.16
email-validator==1.1.3
Flask==1.1.4
Flask-AppBuilder==3.4.5
Flask-Babel==2.0.0
Flask-Caching==1.10.1
Flask-JWT-Extended==3.25.1
Flask-Login==0.4.1
Flask-OpenID==1.3.0
Flask-Session==0.4.0
Flask-SQLAlchemy==2.5.1
Flask-WTF==0.14.3
graphviz==0.19.1
gunicorn==20.1.0
h11==0.12.0
httpcore==0.14.7
httpx==0.22.0
idna==3.3
immutables==0.17
importlib-metadata==4.8.3
importlib-resources==5.4.0
inflection==0.5.1
iso8601==1.0.2
itsdangerous==1.1.0
Jinja2==2.11.3
jsonschema==3.2.0
lazy-object-proxy==1.7.1
lockfile==0.12.2
Mako==1.1.6
Markdown==3.3.6
MarkupSafe==2.0.1
marshmallow==3.14.1
marshmallow-enum==1.5.1
marshmallow-oneofschema==3.0.1
marshmallow-sqlalchemy==0.26.1
packaging==21.3
pendulum==2.1.2
pep562==1.1
prison==0.2.1
psutil==5.9.0
pycparser==2.21
Pygments==2.11.2
PyJWT==1.7.1
PyMySQL==0.10.1
pyparsing==3.0.8
pyrsistent==0.18.0
python-daemon==2.3.0
python-dateutil==2.8.2
python-nvd3==0.15.0
python-slugify==4.0.1
python3-openid==3.2.0
pytz==2022.1
pytzdata==2020.1
PyYAML==6.0
requests==2.27.1
rfc3986==1.5.0
rich==12.2.0
setproctitle==1.2.3
six==1.16.0
sniffio==1.2.0
SQLAlchemy==1.3.22
SQLAlchemy-JSONField==1.0.0
SQLAlchemy-Utils==0.38.2
swagger-ui-bundle==0.0.9
tabulate==0.8.9
tenacity==8.0.1
termcolor==1.1.0
text-unidecode==1.3
typing==3.7.4.3
typing_extensions==4.1.1
unicodecsv==0.14.1
urllib3==1.26.9
Werkzeug==1.0.1
wrapt==1.14.0
WTForms==2.3.3
zipp==3.6.0

希望大家支持

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐