Ответ
Для взаимодействия с Kafka на Python чаще всего используют библиотеки kafka-python или confluent-kafka-python. Ниже приведены базовые примеры для kafka-python.
Producer (Отправитель)
Producer отвечает за отправку сообщений в определенный топик Kafka. Ключевые шаги — сериализация данных и отправка.
import json
from kafka import KafkaProducer
# Инициализация продюсера
# bootstrap_servers - список брокеров Kafka
# value_serializer - функция для преобразования данных в байты (например, JSON -> bytes)
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Данные для отправки
data = {'user_id': 123, 'action': 'login'}
# Асинхронная отправка сообщения в топик 'user-events'
print(f"Sending message: {data}")
producer.send('user-events', value=data)
# Важно: дождаться отправки всех сообщений из буфера
producer.flush()
producer.close()
Consumer (Получатель)
Consumer подписывается на один или несколько топиков и читает из них сообщения. Он работает в составе consumer group для распределения нагрузки.
import json
from kafka import KafkaConsumer
# Инициализация консьюмера
# 'user-events' - топик для чтения
# group_id - идентификатор группы потребителей
# auto_offset_reset='earliest' - начать чтение с самого старого сообщения
# value_deserializer - функция для преобразования байтов обратно в объект
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
print("Waiting for messages...")
# Бесконечный цикл для чтения сообщений
for message in consumer:
# message содержит метаданные (топик, партиция, offset) и само значение
print(f"Received: {message.value}, partition={message.partition}, offset={message.offset}")
Ключевые аспекты при разработке:
- Надежность: Обработка ошибок подключения и механизмы повторных попыток (retries).
- Сериализация данных: Использование форматов вроде JSON, Avro или Protobuf для обеспечения консистентности схемы данных.
- Управление смещениями (offsets): Явное подтверждение (
commit) обработки сообщений для гарантии доставки "at-least-once" или "exactly-once". - Масштабируемость: Использование групп потребителей (consumer groups) для параллельной обработки сообщений из разных партиций одного топика.