Ответ
Чтобы поднять гарантии с at-least-once до exactly-once, нужно устранить возможные дубликаты, возникающие при повторных отправках. Я применяю два основных подхода, часто вместе.
1. Идемпотентный продюсер Kafka:
Включается настройкой enable.idempotence=true. При этом Kafka Broker присваивает каждой паре (Producer ID, Sequence Number) для данного раздела, отбрасывая дубликаты.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // Ключевая настройка
// При включении идемпотентности автоматически устанавливаются acks=all и max.in.flight.requests.per.connection=5
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
2. Транзакции Kafka для exactly-once обработки (read-process-write): Это более тяжелый механизм, который координирует потребление, обработку и запись результата.
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // Инициализация транзакций
try {
producer.beginTransaction();
// 1. Чтение из топика input-topic (в рамках потребительской группы)
// 2. Обработка данных
producer.send(new ProducerRecord<>("output-topic", key, processedValue));
// 3. Отправка смещений потребителя как часть транзакции
producer.sendOffsetsToTransaction(currentOffsets, consumerGroupId);
producer.commitTransaction(); // Всё или ничего
} catch (Exception e) {
producer.abortTransaction(); // Откат, смещения не будут зафиксированы
throw e;
}
3. Дедупликация на стороне приемника (sink):
Если приемник — база данных, можно реализовать идемпотентную запись через INSERT ... ON CONFLICT DO NOTHING/UPDATE (в PostgreSQL) или использовать уникальный ключ сообщения (например, message_id) как первичный или бизнес-ключ.