Ответ
Для взаимодействия с Kafka на Python чаще всего используют библиотеки kafka-python или confluent-kafka-python. Ниже приведены базовые примеры для kafka-python.
Producer (Отправитель)
Producer отвечает за отправку сообщений в определенный топик Kafka. Ключевые шаги — сериализация данных и отправка.
import json
from kafka import KafkaProducer
# Инициализация продюсера
# bootstrap_servers - список брокеров Kafka
# value_serializer - функция для преобразования данных в байты (например, JSON -> bytes)
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Данные для отправки
data = {'user_id': 123, 'action': 'login'}
# Асинхронная отправка сообщения в топик 'user-events'
print(f"Sending message: {data}")
producer.send('user-events', value=data)
# Важно: дождаться отправки всех сообщений из буфера
producer.flush()
producer.close()
Consumer (Получатель)
Consumer подписывается на один или несколько топиков и читает из них сообщения. Он работает в составе consumer group для распределения нагрузки.
import json
from kafka import KafkaConsumer
# Инициализация консьюмера
# 'user-events' - топик для чтения
# group_id - идентификатор группы потребителей
# auto_offset_reset='earliest' - начать чтение с самого старого сообщения
# value_deserializer - функция для преобразования байтов обратно в объект
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
print("Waiting for messages...")
# Бесконечный цикл для чтения сообщений
for message in consumer:
# message содержит метаданные (топик, партиция, offset) и само значение
print(f"Received: {message.value}, partition={message.partition}, offset={message.offset}")
Ключевые аспекты при разработке:
- Надежность: Обработка ошибок подключения и механизмы повторных попыток (retries).
- Сериализация данных: Использование форматов вроде JSON, Avro или Protobuf для обеспечения консистентности схемы данных.
- Управление смещениями (offsets): Явное подтверждение (
commit) обработки сообщений для гарантии доставки "at-least-once" или "exactly-once". - Масштабируемость: Использование групп потребителей (consumer groups) для параллельной обработки сообщений из разных партиций одного топика.
Ответ 18+ 🔞
Давай разберём эту тему про Кафку на Питоне, а то я вижу, у некоторых уже глаза на лоб лезут, как от хорошего анекдота. Слушай, это же не хухры-мухры, а реально мощная штука для потоков данных, типа нервная система для данных, блядь.
Вот смотри, чтобы с Кафкой на Питоне общаться, народ обычно хватает одну из двух библиотек: kafka-python или confluent-kafka-python. Я тебе на пальцах, на примере первой, всё объясню, а то иначе можно с ума сойти, ёпта.
Producer (Тот, кто отправляет, он же говорящий попугай)
Этот тип отвечает за то, чтобы пихать сообщения в определённую тему (топик) Кафки. Главное тут — превратить твои данные в байты и отправить их, не обосравшись по дороге.
import json
from kafka import KafkaProducer
# Запускаем нашего говоруна
# bootstrap_servers — это адреса брокеров, типа почтовых отделений для сообщений
# value_serializer — это волшебная палочка, которая превращает твой словарь в кучу байтов
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Что будем слать? Ну, например, что юзер залогинился
data = {'user_id': 123, 'action': 'login'}
# Отправляем асинхронно в топик 'user-events'. Летит, блядь, как фанера над Парижем.
print(f"Шлём сообщение: {data}")
producer.send('user-events', value=data)
# ВАЖНО, СУКА! Не забудь вытолкнуть всё из буфера, а то сообщения так и останутся висеть в памяти, как сопли.
producer.flush()
producer.close()
Consumer (Тот, кто принимает, он же жадный ушлёпок)
Этот парень подписывается на топик и начинает жрать оттуда сообщения. Он может работать в компании таких же ушлёпков (consumer group), чтобы распределить нагрузку и не сдохнуть от потока данных.
import json
from kafka import KafkaConsumer
# Включаем нашего потребителя
# 'user-events' — топик, на который мы подписались, как на газету
# group_id — наш клуб по интересам, группа потребителей
# auto_offset_reset='earliest' — читать всё с самого начала, даже старьё
# value_deserializer — обратная волшебная палочка, из байтов делает нормальные данные
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
print("Ждём сообщений, как манны небесной...")
# Вечный цикл, который жрёт сообщения. Выхода нет, только перезагрузка.
for message in consumer:
# В message лежит не только суть, но и метаданные: откуда, какая партиция, какой offset
print(f"Получили: {message.value}, partition={message.partition}, offset={message.offset}")
На что смотреть, чтобы не облажаться, блядь:
- Надёжность, ёпта: Пиши обработку ошибок подключения и делай повторные попытки (retries). А то один раз упадёт — и всё, пиши пропало.
- Сериализация данных: Используй нормальные форматы вроде JSON, Avro или Protobuf. Не выдумывай свои велосипеды, а то схема данных разъедется, как говно по асфальту.
- Управление смещениями (offsets): Явно подтверждай (
commit), что сообщение обработал. Иначе получишь доставку "хотя бы раз" или, что хуже, "ровно один раз", и будет тебе пиздец с дубликатами. - Масштабируемость: Используй группы потребителей (consumer groups). Это когда несколько таких же жадных ушлёпков параллельно жрут сообщения из разных партиций одного топика. Красота, а не жизнь!