Airflow【实践 01】Airflow官网+自测源代码举例(简化安装+官方及自测python代码)

本文详细介绍了如何在ApacheAirflow2.7.2中进行安装,包括设置安装目录、创建虚拟环境并安装Airflow,以及如何查询DAG目录。还提供了官方示例和自测案例,展示了如何使用BashOperator和配置任务依赖。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >


官方网站地址: https://airflow.apache.org/docs/apache-airflow/2.7.2/,本文是基于 2.7.2版本进行的说明。

1.准备

1.1 安装

上一篇的 Quick Start 有详细的安装过程,这里做最简安装启动:

# 1.设置安装目录
export AIRFLOW_HOME=~/airflow

# 2.创建虚拟环境并安装
# 创建并切换到airflow虚拟环境
conda create -n airflow python=3.8
conda activate airflow
pip install "apache-airflow==2.7.2"

# 3.前台启动【在虚拟环境下】
airflow standalone

1.2 查询DAG目录

  1. dags_folder目录将Python文件放置到
[root@tcloud airflow]# cat airflow.cfg | grep dags_folder
dags_folder = /root/airflow/dags

2.官方

  1. 安装任务所需的依赖【代码执行所需要的依赖】
conda install scikit-learn
  1. 官方举例文件demo.py放置到dags_folder路径下

文件内容如下:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:

    # Tasks are represented as operators
    hello = BashOperator(task_id="hello", bash_command="echo hello")

    @task()
    def airflow():
        print("airflow")

    # Set dependencies between tasks
    hello >> airflow()
  1. 刷新页面即可看到DAG,点击即可执行,效果如下:

demo.jpg

3.自测

  1. 创建测试文件airflow_test.py并放置到dags_folder目录下

文件内容如下:

from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# DAG属性定义
default_args = {
    'owner': 'airFlowTest',
    'depends_on_past': False,
    'start_date': days_ago(31),
    # 填入邮箱,方便失败、重试时发送邮件
    'email': ['xxxxx@qq.com'],
    # 失败时发邮件告警
    'email_on_failure': True,
    'email_on_retry': False,
    # 重试次数
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),bu
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    'trigger_rule': 'all_success'
}

# 定义dag
dag = DAG(
    'air_test',
    default_args=default_args,
    description='A simple airflow test',
    schedule_interval=timedelta(days=1),
)

step_one = BashOperator(
    task_id='step_one',
    bash_command='echo step_one over! >> /root/airflow/file/airflowtest.log',
    dag=dag,
)

step_two = BashOperator(
    task_id='step_two',
    depends_on_past=False,
    bash_command='echo step_two over! >> /root/airflow/file/airflowtest.log',
    retries=3,
    dag=dag,
)

step_three = BashOperator(
    task_id='step_three',
    depends_on_past=False,
    bash_command='echo step_three over! >> /root/airflow/file/airflowtest.log',
    retries=3,
    dag=dag,
)

step_four = BashOperator(
    task_id='step_four',
    depends_on_past=False,
    bash_command='echo step_four over! >> /root/airflow/file/airflowtest.log',
    retries=3,
    dag=dag,
)

step_one >> step_two >> step_four
  1. 执行结果
[root@tcloud file]# pwd
/root/airflow/file

[root@tcloud file]# cat airflowtest.log
step_one over!
step_three over!
step_two over!
step_four over!

执行结果说明:step_one和step_three是同时开始执行的,step_one、tep_two、step_four是按顺序执行的。

4.总结

  • 使用bash_command的可操作空间就比较大了
  • airflow的语法需要进行学习
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

シ風

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值