Prefect flows orchestrate tasks with retries, caching, logging, and cloud deployment scheduling.
Prefect — Modern Orchestration
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
# Tasks are individual units of work
@task(retries=3, retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1))
def extract_sales_data(date: str) -> list:
"""Extract sales from CRM API."""
return crm_client.get_orders(date=date)
@task(log_prints=True)
def transform_data(records: list) -> list:
"""Clean and enrich records."""
return [
{**r,
"revenue": r["qty"] * r["price"],
"month": r["date"][:7]}
for r in records if r["status"] == "completed"
]
@task
def load_to_warehouse(records: list, date: str) -> int:
"""Load to Snowflake."""
snowflake.insert_many("fct_orders", records)
return len(records)
# Flow orchestrates tasks
@flow(name="daily-sales-etl",
description="Daily sales pipeline")
def sales_pipeline(date: str = None):
date = date or "{{ today }}"
raw = extract_sales_data(date)
clean = transform_data(raw)
count = load_to_warehouse(clean, date)
print(f"Loaded {count} records for {date}")
# Deploy with schedule
if __name__ == "__main__":
sales_pipeline.serve(
name="daily-sales",
cron="0 6 * * *"
)