Как работает семантика Exactly-Once в Apache Kafka?

Ответ

Exactly-Once Semantics (EOS) в Apache Kafka гарантирует, что каждое сообщение будет обработано потребителем ровно один раз, даже в случае сбоев продюсера, брокера или консьюмера. Это достигается комбинацией идемпотентного продюсера и транзакций.

1. Идемпотентный продюсер (Idempotent Producer):

  • Цель: Предотвратить дублирование сообщений из-за повторных отправок (retries) при временных ошибках (например, таймаут сети).
  • Как работает: Каждому продюсеру назначается уникальный PID (Producer ID), а каждому сообщению — монотонно растущий sequence number для каждой партиции топика. Брокер Kafka отслеживает эти номера и отбрасывает дубликаты.
  • Включение: Установите enable.idempotence=true в настройках продюсера.

2. Транзакции (Transactions):

  • Цель: Обеспечить атомарность при отправке сообщений в несколько партиций/топиков, а также согласованность между чтением и записью (consuming и producing) в паттерне «обработка-запись».
  • Как работает: Продюсер координирует с брокером управление транзакциями. Все сообщения, отправленные между beginTransaction() и commitTransaction(), либо все будут видны консьюмерам (с isolation.level=read_committed), либо все будут отброшены.

Пример кода продюсера с Exactly-Once:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Включаем идемпотентность и транзакции
props.put("enable.idempotence", "true"); // Неявно устанавливает acks=all, max.in.flight.requests.per.connection=5 (или 1 для <= 2.0)
props.put("transactional.id", "my-app-transactional-id"); // Должен быть уникальным и постоянным для данного продюсера

Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // Регистрация продюсера у координатора транзакций

try {
    producer.beginTransaction();
    // Атомарная отправка в несколько топиков
    producer.send(new ProducerRecord<>("orders", orderId, orderJson));
    producer.send(new ProducerRecord<>("audit-log", orderId, auditJson));
    // ... любая другая бизнес-логика ...
    producer.commitTransaction(); // Сообщения становятся видимыми
} catch (KafkaException e) {
    producer.abortTransaction(); // Все отправленные в транзакции сообщения отменяются
    // Обработка ошибки и возможный retry с новой транзакцией
}

Настройка консьюмера для чтения транзакционных сообщений:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// Читаем только закоммиченные транзакции
props.put("isolation.level", "read_committed");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
// Консьюмер не увидит сообщения из незавершенных или прерванных транзакций.

Важные ограничения и детали:

  • Требует Kafka >= 0.11.
  • Накладные расходы: Транзакции добавляют задержку и используют больше памяти на брокере.
  • Exactly-Once Delivery vs. Processing: Гарантия относится к доставке в Kafka. Для сквозной семантики «ровно-один-раз» в приложении (E2E) консьюмер также должен быть идемпотентным (например, сохранять смещения вместе с результатом обработки в транзакционное хранилище).
  • transactional.id: Должен быть постоянным для данного логического продюсера при перезапусках, чтобы Kafka могла сопоставить его с PID и продолжить отслеживание sequence numbers.

Ответ 18+ 🔞

А, слушай, про эту вашу «Exactly-Once Semantics» в Кафке! Ну, блядь, сказка, а не технология. Суть в том, чтобы каждое сообщение, этот маленький пиздёныш данных, обработалось ровно один раз. Не ноль, не два, а один, мать его! Даже если продюсер, брокер или консьюмер накрылись медным тазом посередине процесса.

Как это, блядь, достигается? А вот как, два кита, на которых всё держится:

1. Идемпотентный продюсер (Idempotent Producer). Это чтобы продюсер, этот трепло, когда у него сеть чихнула и он переотправляет сообщение, не заспамил всё дублями. Каждому продюсеру дают PID, а каждому сообщению — порядковый номер для партиции. Брокер смотрит: «Ага, номер 5 для PID 123 в партиции 0 я уже видел, иди нахуй со своим дублем». Включается одной строчкой: enable.idempotence=true. Ёпта, красота!

2. Транзакции (Transactions). Это уже для серьёзных пацанов, которые пишут сразу в несколько топиков. Либо всё записалось, либо нихуя. И главное — для паттерна «прочитал-обработал-записал результат». Чтобы консьюмер, пока он обрабатывает, не увидел свои же промежуточные записи, сделанные в другой топик. Всё это дело оборачивается в beginTransaction() и commitTransaction(). Не пронесло? abortTransaction(), и все следы подчищены.

Вот, смотри, как выглядит этот цирк в коде. Блоки кода не трогаю, они святые.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Включаем магию, сука!
props.put("enable.idempotence", "true");
props.put("transactional.id", "my-app-transactional-id"); // Этот айдишник должен быть как татуировка — навсегда.

Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // Регистрируемся, типа «я тут, готов транзакции гонять»

try {
    producer.beginTransaction();
    // Шлём атомарно, блядь, в два места сразу
    producer.send(new ProducerRecord<>("orders", orderId, orderJson));
    producer.send(new ProducerRecord<>("audit-log", orderId, auditJson));
    // ... тут ещё какая-то бизнес-логика может быть ...
    producer.commitTransaction(); // Всё, теперь сообщения видны. Ура!
} catch (KafkaException e) {
    producer.abortTransaction(); // Всё пошло по пизде — откатываем, как будто ничего и не было.
    // Ну и тут уже решай, перезапускать транзакцию или нет.
}

А консьюмер, чтобы не подглядывать за незаконченными делами, должен надеть шоры:

Properties props = new Properties();
// ... обычные настройки ...
// Читаем ТОЛЬКО закоммиченные транзакции, как честные люди.
props.put("isolation.level", "read_committed");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));

Но, блядь, есть нюансы, ебать!

  • Версия Кафки нужна 0.11 или выше. На древних версиях — забудь.
  • Накладные расходы: Транзакции — это не просто так. Задержки чуть больше, память на брокере жрут. За надёжность платим.
  • Exactly-Once Delivery vs. Processing: Это важно, ёпта! Кафка гарантирует, что сообщение доставлено в неё ровно раз. Но чтобы твоё приложение обработало его ровно раз (сквозная гарантия), консьюмер тоже должен быть хитрожопым — например, сохранять смещение вместе с результатом в одну транзакцию БД. Иначе можно наебнуться.
  • transactional.id: Это твоя паспортная система, блядь. Должен быть постоянным при перезапусках приложения, иначе Кафка тебя не узнает и начнётся пиздец с дублями.

Вот так вот, в двух словах. Выглядит сложно, но когда настроишь — живёшь спокойно, не бздишь, что что-то потеряется или задвоится.