Как можно обеспечить семантику exactly-once, имея гарантию at-least-once?

«Как можно обеспечить семантику exactly-once, имея гарантию at-least-once?» — вопрос из категории Потоковая обработка и Kafka, который задают на 33% собеседований Data Инженер. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Чтобы поднять гарантии с at-least-once до exactly-once, нужно устранить возможные дубликаты, возникающие при повторных отправках. Я применяю два основных подхода, часто вместе.

1. Идемпотентный продюсер Kafka: Включается настройкой enable.idempotence=true. При этом Kafka Broker присваивает каждой паре (Producer ID, Sequence Number) для данного раздела, отбрасывая дубликаты.

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // Ключевая настройка
// При включении идемпотентности автоматически устанавливаются acks=all и max.in.flight.requests.per.connection=5
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

2. Транзакции Kafka для exactly-once обработки (read-process-write): Это более тяжелый механизм, который координирует потребление, обработку и запись результата.

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // Инициализация транзакций

try {
    producer.beginTransaction();
    // 1. Чтение из топика input-topic (в рамках потребительской группы)
    // 2. Обработка данных
    producer.send(new ProducerRecord<>("output-topic", key, processedValue));
    // 3. Отправка смещений потребителя как часть транзакции
    producer.sendOffsetsToTransaction(currentOffsets, consumerGroupId);
    producer.commitTransaction(); // Всё или ничего
} catch (Exception e) {
    producer.abortTransaction(); // Откат, смещения не будут зафиксированы
    throw e;
}

3. Дедупликация на стороне приемника (sink): Если приемник — база данных, можно реализовать идемпотентную запись через INSERT ... ON CONFLICT DO NOTHING/UPDATE (в PostgreSQL) или использовать уникальный ключ сообщения (например, message_id) как первичный или бизнес-ключ.