Data observability monitors data health — freshness, volume, schema, distribution, and lineage — to detect issues before they impact business.
# The 5 pillars of data observability
# 1. Freshness: is data up to date?
# 2. Volume: are row counts normal?
# 3. Schema: did column types change?
# 4. Distribution: are values in expected ranges?
# 5. Lineage: which upstream table broke this?
# Custom observability checks in Airflow
from airflow.sensors.sql_sensor import SqlSensor
# Wait for data to be fresh
freshness_check = SqlSensor(
task_id="check_data_freshness",
conn_id="snowflake_conn",
sql="SELECT COUNT(*) FROM fct_orders WHERE created_at >= CURRENT_DATE",
mode="poke",
poke_interval=300, # check every 5 minutes
timeout=3600
)
# Volume anomaly detection
def check_volume_anomaly():
import pandas as pd
from scipy import stats
history = get_daily_row_counts(days=30)
today = get_today_row_count()
z_score = stats.zscore(history + [today])[-1]
if abs(z_score) > 3:
send_alert(f"Volume anomaly: {today} rows (z={z_score:.2f})")
# Schema change detection
# Compare current schema vs last-known schema
# Alert on: column added, removed, type changed
# Monte Carlo, Soda, Acceldata -- managed observability