Airflow DAGs orchestrate pipeline steps with dependencies, retries, scheduling, and email alerts.
Apache Airflow — Pipeline Orchestration
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
# Define DAG
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["alerts@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="daily_sales_pipeline",
default_args=default_args,
description="Extract, transform, load daily sales data",
schedule="0 6 * * *", # 6 AM daily
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["sales","daily"]
) as dag:
extract = PythonOperator(
task_id="extract_crm_data",
python_callable=extract_from_crm,
op_kwargs={"date": "{{ ds }}"}
)
transform = BashOperator(
task_id="run_dbt_models",
bash_command="dbt run --select sales_mart --vars 'date: {{ ds }}'"
)
load_dw = PostgresOperator(
task_id="load_to_warehouse",
postgres_conn_id="snowflake_conn",
sql="CALL load_sales_fact('{{ ds }}');"
)
extract >> transform >> load_dw # dependency chain