"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import datetime
from airflow.operators.subdag_operator import SubDagOperator
from datetime import timedelta, datetime
import pandas as pd
from sync_config import lxcs_high_mysql
from sqlalchemy import create_engine
PARENT_XMXY_TEN_MINUTES = 'parent_xmxy_ten_minutes'
parent_args = {
'owner': 'xmxy',
'depends_on_past': False,
"catchup": False,
'pool': 'xmxy',
'email': ['xiongmaoxueyuan@xiongmaoxueyuan.com'],
'email_on_failure': True,
'email_on_retry': False,
'start_date': datetime(2020, 9, 12, 18, 40),
'retries': 3,
'concurrency': 15,
'retry_delay': timedelta(minutes=1),
}
child_args = {
'owner': 'xmxy',
'ignore_first_depends_on_past':True,
'depends_on_past': True,
"catchup":False,
'pool': 'syn_db',
'email': ['xiongmaoxueyuan@xiongmaoxueyuan.com'],
'email_on_failure': False,
'email_on_retry': False,
'start_date': datetime(2020, 9, 12, 18, 40),
'retries': 3,
'concurrency': 15,
'max_active_runs': 1,
'retry_delay': timedelta(minutes=1),
}
parent_dag = DAG(
dag_id=PARENT_XMXY_TEN_MINUTES ,
default_args=parent_args,
schedule_interval='*/10 * * * *',
concurrency=10,
tags=['gp']
)
dag_subdag = DAG(
dag_id='%s.%s' % (PARENT_XMXY_TEN_MINUTES , 'child1'),
default_args=child_args,
schedule_interval='*/10 * * * *',
concurrency=1,
tags=['gp'])
SSHOperator(ssh_conn_id='ssh_sync', task_id=i,
command='/bin/python3.6 -u /root/sync_db/main.py || exit 1', dag=dag_subdag)
sub_opt = SubDagOperator(
task_id='child_task',
subdag=dag_subdag,
dag=parent_dag)