Ответ
Для гарантии отправки сообщения ровно один раз (exactly-once delivery) и предотвращения дублей в Kafka используются два основных подхода: идемпотентный продюсер и дедупликация на уровне приложения.
1. Идемпотентный продюсер (встроенный механизм Kafka)
Этот механизм гарантирует, что повторные отправки одного и того же сообщения в рамках одной сессии продюсера не приведут к дублированию. Брокер отслеживает сообщения по уникальному идентификатору продюсера (PID) и порядковому номеру.
Настройка:
Включается одним параметром в конфигурации продюсера. Это автоматически устанавливает acks='all' и retries на большое значение.
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
# Включает идемпотентность
enable_idempotence=True
)
Когда использовать:
- Когда достаточно гарантий в пределах жизни одного экземпляра продюсера.
- Для защиты от сбоев сети, которые могут вызвать повторные отправки.
2. Дедупликация на стороне приложения
Этот подход обеспечивает гарантии даже между перезапусками продюсера. Логика реализуется вручную.
Алгоритм:
- Сгенерировать уникальный ключ для каждого сообщения (например, UUID или хеш от содержимого).
- Перед отправкой проверить в хранилище (например, Redis или БД), не был ли уже обработан ключ.
- Если ключа нет — отправить сообщение и сохранить ключ в хранилище.
Пример логики:
import uuid
# message_storage - ваше хранилище (например, Redis set)
def send_unique_message(producer, topic, message_data, message_storage):
message_key = str(uuid.uuid4()) # или хеш от message_data
# Проверяем, не был ли ключ уже обработан
if not message_storage.exists(message_key):
producer.send(topic, key=message_key, value=message_data)
producer.flush()
# Сохраняем ключ после успешной отправки
message_storage.add(message_key)
print(f"Сообщение с ключом {message_key} отправлено.")
else:
print(f"Дубликат сообщения с ключом {message_key} пропущен.")
Когда использовать:
- Когда требуется строгая гарантия отсутствия дублей даже после сбоев и перезапуска приложения.
- Когда продюсеры могут быть распределенными и не иметь общей сессии.