📡 You're offline — showing cached content
New version available!
Quick Access
Tutorials Data Engineering Apache Airflow

Apache Airflow

6 min read Quiz at the end
Airflow DAGs orchestrate pipeline steps with dependencies, retries, scheduling, and email alerts.

Apache Airflow — Pipeline Orchestration

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta

# Define DAG
default_args = {
    "owner":           "data-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email":           ["alerts@company.com"],
    "retries":         3,
    "retry_delay":     timedelta(minutes=5),
}

with DAG(
    dag_id="daily_sales_pipeline",
    default_args=default_args,
    description="Extract, transform, load daily sales data",
    schedule="0 6 * * *",  # 6 AM daily
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=["sales","daily"]
) as dag:

    extract = PythonOperator(
        task_id="extract_crm_data",
        python_callable=extract_from_crm,
        op_kwargs={"date": "{{ ds }}"}
    )

    transform = BashOperator(
        task_id="run_dbt_models",
        bash_command="dbt run --select sales_mart --vars 'date: {{ ds }}'"
    )

    load_dw = PostgresOperator(
        task_id="load_to_warehouse",
        postgres_conn_id="snowflake_conn",
        sql="CALL load_sales_fact('{{ ds }}');"
    )

    extract >> transform >> load_dw  # dependency chain
Topic Quiz · 1 questions

Test your understanding before moving on

1. What does catchup=False do in an Airflow DAG?
💡 Without catchup=False, Airflow would run one DAG for every missed interval since start_date — usually undesired.