airflow

https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html

--------------------------------------------------------------------------------------------------------

install:
 export AIRFLOW_HOME=~/airflow
  
 # install from pypi using pip
 pip install apache-airflow
  
 # initialize the database
 airflow initdb(这一步会报错,往往安装的时候报错,无非是版本不匹配,权限不对,文件位置不对,配置不对,依赖不对,全都是适配性的东西,用docker都可以解决)
 GitHub issue里有讨论解决:https://github.com/apache/airflow/issues/11965
 因为:
 this was caused by a Python dependency; specifically cattrs==1.1.0, which was released yesterday (2020-10-29). Downgrading cattrs manually to 1.0.0
 does fix the issue and allows the Airflow database to be initialised:
  
 # start the web server, default port is 8080
 airflow webserver -p 8080
  
 # start the scheduler
 airflow scheduler
  
 # visit localhost:8080 in the browser and enable the example dag in the home page
  
 现在通过localhost:8080就可以访问airflow web 了。
  
 之后写一个测试调度:kk.py
 from datetime import timedelta
  
 # The DAG object; we'll need this to instantiate a DAG
 from airflow import DAG
 # Operators; we need this to operate!
 from airflow.operators.bash_operator import BashOperator
 from airflow.utils.dates import days_ago
 # These args will get passed on to each operator
 # You can override them on a per-task basis during operator initialization
 default_args = {
 'owner': 'airflow',
 'depends_on_past': False,
 'start_date': days_ago(2),
 'email': ['airflow@example.com'],
 'email_on_failure': False,
 'email_on_retry': False,
 'retries': 1,
 'retry_delay': timedelta(minutes=5),
 # 'queue': 'bash_queue',
 # 'pool': 'backfill',
 # 'priority_weight': 10,
 # 'end_date': datetime(2016, 1, 1),
 # 'wait_for_downstream': False,
 # 'dag': dag,
 # 'sla': timedelta(hours=2),
 # 'execution_timeout': timedelta(seconds=300),
 # 'on_failure_callback': some_function,
 # 'on_success_callback': some_other_function,
 # 'on_retry_callback': another_function,
 # 'sla_miss_callback': yet_another_function,
 # 'trigger_rule': 'all_success'
 }
 dag = DAG(
 'tutorial',
 default_args=default_args,
 description='A simple tutorial DAG',
 schedule_interval=timedelta(days=1),
 )
  
 # t1, t2 and t3 are examples of tasks created by instantiating operators
 t1 = BashOperator(
 task_id='print_date',
 bash_command='date',
 dag=dag,
 )
  
 t2 = BashOperator(
 task_id='sleep',
 depends_on_past=False,
 bash_command='sleep 5',
 retries=3,
 dag=dag,
 )
 dag.doc_md = __doc__
  
 t1.doc_md = """\
 #### Task Documentation
 You can document your task using the attributes `doc_md` (markdown),
 `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
 rendered in the UI's Task Instance Details page.
 ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
 """
 templated_command = """
 {% for i in range(5) %}
 echo "{{ ds }}"
 echo "{{ macros.ds_add(ds, 7)}}"
 echo "{{ params.my_param }}"
 {% endfor %}
 """
  
 t3 = BashOperator(
 task_id='templated',
 depends_on_past=False,
 bash_command=templated_command,
 params={'my_param': 'Parameter I passed in'},
 dag=dag,
 )
  
 t1 >> [t2, t3]
  
 然后放入 dag目录,默认 airflow/dags 目录。
 然后执行 python3 kk.py 执行后,无异常输出就没问题,然后要等 ,等一会才能在web里刷新到调度
  
  
 触发调度:
 Each DAG may or may not have a schedule, which informs how DAG Runs are created. schedule_interval is defined as a DAG arguments,
 and receives preferably a cron expression as a str,
 or a datetime.timedelta object. Alternatively, you can also use one of these cron “preset”:
  
  
 --------------------------------------------------------------
  
 operator->task->dag
  
  
 DAG: The work (tasks), and the order in which work should take place (dependencies), written in Python.
 DAG Run: An instance of a DAG for a particular logical date and time.
 Operator: A class that acts as a template for carrying out some work.
 Task: Defines work by implementing an operator, written in Python.
 Task Instance: An instance of a task - that has been assigned to a DAG and has a state associated with a specific DAG run (i.e for a specific execution_date).
 execution_date: The logical date and time for a DAG Run and its Task Instances.
  
  
 DAG Runs
 A DAG run is a physical instance of a DAG, containing task instances that run for a specific execution_date.
  
  
  
  
 tasks:
 Each task is an implementation of an Operator, for example a PythonOperator to execute some Python code,
 or a BashOperator to run a Bash command.
  
 task_1 >> task_2 # Define dependencies
 We can say that task_1 is upstream of task_2, and conversely task_2 is downstream of task_1.
 When a DAG Run is created, task_1 will start running and task_2 waits for task_1 to complete successfully before it may start.
  
  
 task state
 Task instances also have an indicative state, which could be “running”, “success”, “failed”, “skipped”, “up for retry”, etc.
  
 Airflow does have a feature for operator cross-communication called XCom that is described in the section XComs
  
  
  
  
 Airflow provides operators for many common tasks, including:
  
 BashOperator - executes a bash command
  
 PythonOperator - calls an arbitrary Python function
  
 EmailOperator - sends an email
  
 SimpleHttpOperator - sends an HTTP request
  
 MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. - executes a SQL command
  
 Sensor - an Operator that waits (polls) for a certain time, file, database row, S3 key, etc…
  
 In addition to these basic building blocks, there are many more specific operators: DockerOperator, HiveOperator, S3FileTransformOperator, PrestoToMySqlTransfer, SlackAPIOperator… you get the idea!
  
 Operators are only loaded by Airflow if they are assigned to a DAG.
  
  
  
 -----------------------------------------------------------------------------------------
 action:
  
 查看依赖:
 # print the list of active DAGs
 airflow list_dags
  
 # prints the list of tasks the "tutorial" dag_id
 airflow list_tasks tutorial
  
 # prints the hierarchy of tasks in the tutorial DAG
 airflow list_tasks tutorial --tree
  
  
 测试:
 execution_date
 # command layout: command subcommand dag_id task_id date
 airflow test tutorial print_date 2015-06-01
  
 note: airflow test,It simply allows testing a single task instance.
  
  
 depends_on_past = true
 是否满足 task 的依赖性。 直接位于 task 上游的任务实例需要处于success状态。 此外,如果设置depends_on_past=True ,除了满足上游成功之外,
 前一个调度周期的 task instance 也需要成功(除非该 task 是第一次运行)
 You may also want to consider wait_for_downstream=True when using depends_on_past=True. While depends_on_past=True causes a task instance to depend on
 the success of its previous task_instance, wait_for_downstream=True will cause a task instance to also wait for all task instances immediately downstream
 of the previous task instance to succeed.
  
 运行:
 backfill:
 The operation of running a DAG for a specified date in the past is called “backfilling.”
 》》》airflow backfill -s 2019-01-01 -e 2019-01-04 test_dag
 In this case, I am running the test_dag DAG with the execution date set to 2019-01-01, 2019-01-02, and 2019-01-03.
  
  
 提交生成一个dag:就是执行创建dag的脚本
 python ~/airflow/dags/somedagcode.py
  
 ------------------------------------------------------------------------------------------
  
 airflow docker
 https://github.com/puckel/docker-airflow
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值