Как реализовать Producer и Consumer для Kafka на Python?

Ответ

Для взаимодействия с Kafka на Python чаще всего используют библиотеки kafka-python или confluent-kafka-python. Ниже приведены базовые примеры для kafka-python.

Producer (Отправитель)

Producer отвечает за отправку сообщений в определенный топик Kafka. Ключевые шаги — сериализация данных и отправка.

import json
from kafka import KafkaProducer

# Инициализация продюсера
# bootstrap_servers - список брокеров Kafka
# value_serializer - функция для преобразования данных в байты (например, JSON -> bytes)
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Данные для отправки
data = {'user_id': 123, 'action': 'login'}

# Асинхронная отправка сообщения в топик 'user-events'
print(f"Sending message: {data}")
producer.send('user-events', value=data)

# Важно: дождаться отправки всех сообщений из буфера
producer.flush()
producer.close()

Consumer (Получатель)

Consumer подписывается на один или несколько топиков и читает из них сообщения. Он работает в составе consumer group для распределения нагрузки.

import json
from kafka import KafkaConsumer

# Инициализация консьюмера
# 'user-events' - топик для чтения
# group_id - идентификатор группы потребителей
# auto_offset_reset='earliest' - начать чтение с самого старого сообщения
# value_deserializer - функция для преобразования байтов обратно в объект
consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

print("Waiting for messages...")
# Бесконечный цикл для чтения сообщений
for message in consumer:
    # message содержит метаданные (топик, партиция, offset) и само значение
    print(f"Received: {message.value}, partition={message.partition}, offset={message.offset}")

Ключевые аспекты при разработке:

  • Надежность: Обработка ошибок подключения и механизмы повторных попыток (retries).
  • Сериализация данных: Использование форматов вроде JSON, Avro или Protobuf для обеспечения консистентности схемы данных.
  • Управление смещениями (offsets): Явное подтверждение (commit) обработки сообщений для гарантии доставки "at-least-once" или "exactly-once".
  • Масштабируемость: Использование групп потребителей (consumer groups) для параллельной обработки сообщений из разных партиций одного топика.