Consumers read messages from topics using poll loops, subscribe to topics, and commit offsets.
Kafka Consumers
# Console consumer
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic orders --group myapp --from-beginning
# Python consumer
from confluent_kafka import Consumer
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
}
consumer = Consumer(conf)
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error(): print('Error:', msg.error()); continue
print('Received:', msg.key(), msg.value())
consumer.commit(msg) # manual commit after processing
finally:
consumer.close()