Как реализовать Producer и Consumer для Kafka на Python?

Ответ

Для взаимодействия с Kafka на Python чаще всего используют библиотеки kafka-python или confluent-kafka-python. Ниже приведены базовые примеры для kafka-python.

Producer (Отправитель)

Producer отвечает за отправку сообщений в определенный топик Kafka. Ключевые шаги — сериализация данных и отправка.

import json
from kafka import KafkaProducer

# Инициализация продюсера
# bootstrap_servers - список брокеров Kafka
# value_serializer - функция для преобразования данных в байты (например, JSON -> bytes)
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Данные для отправки
data = {'user_id': 123, 'action': 'login'}

# Асинхронная отправка сообщения в топик 'user-events'
print(f"Sending message: {data}")
producer.send('user-events', value=data)

# Важно: дождаться отправки всех сообщений из буфера
producer.flush()
producer.close()

Consumer (Получатель)

Consumer подписывается на один или несколько топиков и читает из них сообщения. Он работает в составе consumer group для распределения нагрузки.

import json
from kafka import KafkaConsumer

# Инициализация консьюмера
# 'user-events' - топик для чтения
# group_id - идентификатор группы потребителей
# auto_offset_reset='earliest' - начать чтение с самого старого сообщения
# value_deserializer - функция для преобразования байтов обратно в объект
consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

print("Waiting for messages...")
# Бесконечный цикл для чтения сообщений
for message in consumer:
    # message содержит метаданные (топик, партиция, offset) и само значение
    print(f"Received: {message.value}, partition={message.partition}, offset={message.offset}")

Ключевые аспекты при разработке:

  • Надежность: Обработка ошибок подключения и механизмы повторных попыток (retries).
  • Сериализация данных: Использование форматов вроде JSON, Avro или Protobuf для обеспечения консистентности схемы данных.
  • Управление смещениями (offsets): Явное подтверждение (commit) обработки сообщений для гарантии доставки "at-least-once" или "exactly-once".
  • Масштабируемость: Использование групп потребителей (consumer groups) для параллельной обработки сообщений из разных партиций одного топика.

Ответ 18+ 🔞

Давай разберём эту тему про Кафку на Питоне, а то я вижу, у некоторых уже глаза на лоб лезут, как от хорошего анекдота. Слушай, это же не хухры-мухры, а реально мощная штука для потоков данных, типа нервная система для данных, блядь.

Вот смотри, чтобы с Кафкой на Питоне общаться, народ обычно хватает одну из двух библиотек: kafka-python или confluent-kafka-python. Я тебе на пальцах, на примере первой, всё объясню, а то иначе можно с ума сойти, ёпта.

Producer (Тот, кто отправляет, он же говорящий попугай)

Этот тип отвечает за то, чтобы пихать сообщения в определённую тему (топик) Кафки. Главное тут — превратить твои данные в байты и отправить их, не обосравшись по дороге.

import json
from kafka import KafkaProducer

# Запускаем нашего говоруна
# bootstrap_servers — это адреса брокеров, типа почтовых отделений для сообщений
# value_serializer — это волшебная палочка, которая превращает твой словарь в кучу байтов
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Что будем слать? Ну, например, что юзер залогинился
data = {'user_id': 123, 'action': 'login'}

# Отправляем асинхронно в топик 'user-events'. Летит, блядь, как фанера над Парижем.
print(f"Шлём сообщение: {data}")
producer.send('user-events', value=data)

# ВАЖНО, СУКА! Не забудь вытолкнуть всё из буфера, а то сообщения так и останутся висеть в памяти, как сопли.
producer.flush()
producer.close()

Consumer (Тот, кто принимает, он же жадный ушлёпок)

Этот парень подписывается на топик и начинает жрать оттуда сообщения. Он может работать в компании таких же ушлёпков (consumer group), чтобы распределить нагрузку и не сдохнуть от потока данных.

import json
from kafka import KafkaConsumer

# Включаем нашего потребителя
# 'user-events' — топик, на который мы подписались, как на газету
# group_id — наш клуб по интересам, группа потребителей
# auto_offset_reset='earliest' — читать всё с самого начала, даже старьё
# value_deserializer — обратная волшебная палочка, из байтов делает нормальные данные
consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

print("Ждём сообщений, как манны небесной...")
# Вечный цикл, который жрёт сообщения. Выхода нет, только перезагрузка.
for message in consumer:
    # В message лежит не только суть, но и метаданные: откуда, какая партиция, какой offset
    print(f"Получили: {message.value}, partition={message.partition}, offset={message.offset}")

На что смотреть, чтобы не облажаться, блядь:

  • Надёжность, ёпта: Пиши обработку ошибок подключения и делай повторные попытки (retries). А то один раз упадёт — и всё, пиши пропало.
  • Сериализация данных: Используй нормальные форматы вроде JSON, Avro или Protobuf. Не выдумывай свои велосипеды, а то схема данных разъедется, как говно по асфальту.
  • Управление смещениями (offsets): Явно подтверждай (commit), что сообщение обработал. Иначе получишь доставку "хотя бы раз" или, что хуже, "ровно один раз", и будет тебе пиздец с дубликатами.
  • Масштабируемость: Используй группы потребителей (consumer groups). Это когда несколько таких же жадных ушлёпков параллельно жрут сообщения из разных партиций одного топика. Красота, а не жизнь!