📡 You're offline — showing cached content
New version available!
Quick Access
Tutorials Data Engineering Prefect Orchestration

Prefect Orchestration

5 min read Quiz at the end
Prefect flows orchestrate tasks with retries, caching, logging, and cloud deployment scheduling.

Prefect — Modern Orchestration

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

# Tasks are individual units of work
@task(retries=3, retry_delay_seconds=60,
      cache_key_fn=task_input_hash,
      cache_expiration=timedelta(hours=1))
def extract_sales_data(date: str) -> list:
    """Extract sales from CRM API."""
    return crm_client.get_orders(date=date)

@task(log_prints=True)
def transform_data(records: list) -> list:
    """Clean and enrich records."""
    return [
        {**r,
         "revenue": r["qty"] * r["price"],
         "month":   r["date"][:7]}
        for r in records if r["status"] == "completed"
    ]

@task
def load_to_warehouse(records: list, date: str) -> int:
    """Load to Snowflake."""
    snowflake.insert_many("fct_orders", records)
    return len(records)

# Flow orchestrates tasks
@flow(name="daily-sales-etl",
      description="Daily sales pipeline")
def sales_pipeline(date: str = None):
    date   = date or "{{ today }}"
    raw    = extract_sales_data(date)
    clean  = transform_data(raw)
    count  = load_to_warehouse(clean, date)
    print(f"Loaded {count} records for {date}")

# Deploy with schedule
if __name__ == "__main__":
    sales_pipeline.serve(
        name="daily-sales",
        cron="0 6 * * *"
    )