熊猫学猿--airflow创建父与子dag

"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import datetime
from airflow.operators.subdag_operator import SubDagOperator
from datetime import timedelta, datetime
import pandas as pd
from sync_config import lxcs_high_mysql
from sqlalchemy import create_engine
PARENT_XMXY_TEN_MINUTES = 'parent_xmxy_ten_minutes'

parent_args = {
    'owner': 'xmxy',
    'depends_on_past': False,
    "catchup": False,
    'pool': 'xmxy',
    'email': ['xiongmaoxueyuan@xiongmaoxueyuan.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'start_date': datetime(2020, 9, 12, 18, 40),
    'retries': 3,
    'concurrency': 15,  # 每个dag运行过程中最大可同时运行的task实例数
    'retry_delay': timedelta(minutes=1),
}
child_args = {
    'owner': 'xmxy',
    'ignore_first_depends_on_past':True,
    'depends_on_past': True,
    "catchup":False,
    'pool': 'syn_db',
    'email': ['xiongmaoxueyuan@xiongmaoxueyuan.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': datetime(2020, 9, 12, 18, 40),
    'retries': 3,
    'concurrency': 15,  # 每个dag运行过程中最大可同时运行的task实例数
    'max_active_runs': 1,  # 同一时间可以运行的最多的dag runs 数量
    'retry_delay': timedelta(minutes=1),
}
parent_dag = DAG(
    dag_id=PARENT_XMXY_TEN_MINUTES ,
    default_args=parent_args,
    schedule_interval='*/10 * * * *',
    concurrency=10,
    tags=['gp']
)
dag_subdag = DAG(
            dag_id='%s.%s' % (PARENT_XMXY_TEN_MINUTES , 'child1'),#parent_dag_name chide_dag_name
            default_args=child_args,
            schedule_interval='*/10 * * * *',
            concurrency=1,
            tags=['gp'])
SSHOperator(ssh_conn_id='ssh_sync', task_id=i,
                    command='/bin/python3.6 -u /root/sync_db/main.py  || exit 1', dag=dag_subdag)#任务
 #关联父与子dag
sub_opt = SubDagOperator(
            task_id='child_task',
            subdag=dag_subdag,
            dag=parent_dag)
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值