airflow使用

 

转载:链接:https://www.jianshu.com/p/f3ead3a35526

工作原理

airflow是什么?

airflow是Airbnb开源的一个用python编写的调度工具,项目于2014年启动,2015年春季开源,2016年加入Apache软件基金会的孵化计划。

Airflow is a platform to programmatically author, schedule and monitor workflows.
airflow是一个可编程、调度和监控的工作流平台。

Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.
基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行。airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

airflow能用来干什么?

在实际项目中,我们经常遇到以下场景:

  • 运维人员,定时对服务器执行脚本某些脚本,最简单的方式是添加一些crond任务,但如果想追溯各个任务的执行结果时?
  • 在大数据场景下,每隔一段时间需导出线上数据、导入到大数据平台、触发数据处理等多个子操作,且各个子操作含有依赖关系时?
  • 在管理大量主机时,想要一个统一的作业管理平台,能在上面定义各种任务来管理下面的设备?

airflow通过DAG配置文件,能轻松定义各种任务及任务之间的依赖关系和调度执行,并一个可视化的操作web界面。

airflow有什么优势?

  • 自带web管理界面,易上手;
  • 业务代码和调度代码完全解耦;
  • 通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求;
  • python开源项目,支持扩展operate等插件,便于二次开发;
    类似的工具有akzban,quart等;

 

详细安装步骤

实验环境:
centos7
python3.6
安装脚本:

yum install -y python36
yum install -y python36-pip
yum install -y python36-devel
pip3 install apache-airflow
pip3 install pymysql

设置airflow的根目录:

echo "export AIRFLOW_HOME=/data/airflow" >> /root/.bashrc
source /root/.bashrc

初始化

初始化数据库表(默认使用本地的sqlite数据库):

[root@VM_7_246_centos /data]# airflow initdb
[2019-08-05 19:23:29,691] {__init__.py:51} INFO - Using executor SequentialExecutor
DB: sqlite:data/airflow/airflow.db
[2019-08-05 19:23:30,019] {db.py:350} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
/usr/local/lib/python3.6/site-packages/alembic/util/messaging.py:69: UserWarning: Skipping unsupported ALTER for creation of implicit constraint
  warnings.warn(msg)
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
INFO  [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> bdaa763e6c56, Make xcom value column a large binary
INFO  [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 947454bf1dff, add ti job_id index
INFO  [alembic.runtime.migration] Running upgrade 947454bf1dff -> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text types)
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 0e2a74e0fc9f, Add time zone awareness
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 33ae817a1ff4, kubernetes_resource_checkpointing
INFO  [alembic.runtime.migration] Running upgrade 33ae817a1ff4 -> 27c6a30d7c24, kubernetes_resource_checkpointing
INFO  [alembic.runtime.migration] Running upgrade 27c6a30d7c24 -> 86770d1215c0, add kubernetes scheduler uniqueness
INFO  [alembic.runtime.migration] Running upgrade 86770d1215c0, 0e2a74e0fc9f -> 05f30312d566, merge heads
INFO  [alembic.runtime.migration] Running upgrade 05f30312d566 -> f23433877c24, fix mysql not null constraint
INFO  [alembic.runtime.migration] Running upgrade f23433877c24 -> 856955da8476, fix sqlite foreign key
INFO  [alembic.runtime.migration] Running upgrade 856955da8476 -> 9635ae0956e7, index-faskfail
INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> dd25f486b8ea
INFO  [alembic.runtime.migration] Running upgrade dd25f486b8ea -> bf00311e1990, add index to taskinstance
INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> 0a2a5b66e19d, add task_reschedule table
INFO  [alembic.runtime.migration] Running upgrade 0a2a5b66e19d, bf00311e1990 -> 03bc53e68815, merge_heads_2
INFO  [alembic.runtime.migration] Running upgrade 03bc53e68815 -> 41f5f12752f8, add superuser field
INFO  [alembic.runtime.migration] Running upgrade 41f5f12752f8 -> c8ffec048a3b, add fields to dag
INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> a56c9515abdc, Remove dag_stat table
INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> dd4ecb8fbee3, Add schedule interval to dag
INFO  [alembic.runtime.migration] Running upgrade dd4ecb8fbee3 -> 939bb1e647c8, task reschedule fk on cascade delete
WARNI [airflow.utils.log.logging_mixin.LoggingMixin] cryptography not found - values will not be stored encrypted.
Done.

查看其生成文件:

[root@VM_7_246_centos /data]# ls $AIRFLOW_HOME
airflow.cfg  airflow.db  logs  unittests.cfg

配置

配置MySQL数据库(创建airflow数据库,并创建用户和授权,给airflow访问数据库使用):

# 新建airflow的数据库
MariaDB [(none)]> CREATE DATABASE airflow;
# 授权从localhost登录的root账号对airflow的所有操作
MariaDB [(none)]> GRANT all privileges on root.* TO 'root'@'localhost'  IDENTIFIED BY '123456'; 
MariaDB [(none)]> FLUSH PRIVILEGES;

配置airflow使用LocalExecutor执行器,及使用MySQL数据库:

 

vim airflow/airflow.cfg
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
#executor = SequentialExecutor
executor = LocalExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
#sql_alchemy_conn = sqlite:data/airflow/airflow.db
sql_alchemy_conn = mysql+pymysql://root:123456@localhost:3306/airflow

再次初始化数据库表(在MySQL中创建):

 

airflow initdb

查看创建的airflow数据表:

MariaDB [None] > use airflow;
MariaDB [airflow]> show tables;
+-------------------+
| Tables_in_airflow |
+-------------------+
| alembic_version  |
| chart            |
| connection        |
| dag              |
| dag_pickle        |
| dag_run          |
| dag_stats        |
| import_error      |
| job              |
| known_event      |
| known_event_type  |
| log              |
| sla_miss          |
| slot_pool        |
| task_fail        |
| task_instance    |
| users            |
| variable          |
| xcom              |
+-------------------+

服务启动

添加airflow-scheduler服务启动脚本:

 

/usr/lib/systemd/system/airflow-scheduler.service 
[Unit]
Description=Airflow Scheduler
Wants=network-online.target
After=network-online.target

[Service]
User=root
Group=root
Type=simple
Restart=on-failure
Environment=export AIRFLOW_HOME=/data/airflow
ExecStart=/usr/local/bin/airflow scheduler
LimitNOFILE=10000
TimeoutStopSec=20

[Install]
WantedBy=multi-user.target

启动airflow-scheduler服务:

 

systemctl start airflow-scheduler

添加airflow-webserver服务启动脚本:

 

/usr/lib/systemd/system/airflow-webserver.service 
[Unit]
Description=Airflow Scheduler
Wants=network-online.target
After=network-online.target

[Service]
User=root
Group=root
Type=simple
Restart=on-failure
Environment=export AIRFLOW_HOME=/data/airflow
ExecStart=/usr/local/bin/airflow webserver
LimitNOFILE=10000
TimeoutStopSec=20

[Install]
WantedBy=multi-user.target

启动airflow-webserver服务:

 

systemctl start airflow-webserver

使用

访问http://localhost:8080地址,看到首页界面即成功;

 

 
 
 

 

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值