Faust for Python stream processing; Flink for complex event time windowing with watermarks.
Stream Processing with Kafka Streams and Flink
# Kafka Streams (Java/Kotlin, embedded in app)
# Python with Faust
import faust
app = faust.App("analytics", broker="kafka://localhost:9092")
orders_topic = app.topic("orders", value_type=dict)
revenue_table = app.Table("revenue", default=float)
@app.agent(orders_topic)
async def process_orders(orders):
async for order in orders:
if order["status"] == "completed":
key = order["product_id"]
revenue_table[key] += order["amount"]
# Real-time aggregation maintained in state store
# Apache Flink (Python API)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Read from Kafka
t_env.execute_sql("""
CREATE TABLE kafka_orders (
order_id BIGINT,
product_id STRING,
amount DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector'='kafka',
'topic'='orders',
'properties.bootstrap.servers'='kafka:9092',
'format'='json'
)
""")
# 1-minute tumbling window aggregation
t_env.execute_sql("""
SELECT
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
product_id,
SUM(amount) AS revenue
FROM kafka_orders
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE), product_id
""")