Ответ
Exactly-Once Semantics (EOS) в Apache Kafka гарантирует, что каждое сообщение будет обработано потребителем ровно один раз, даже в случае сбоев продюсера, брокера или консьюмера. Это достигается комбинацией идемпотентного продюсера и транзакций.
1. Идемпотентный продюсер (Idempotent Producer):
- Цель: Предотвратить дублирование сообщений из-за повторных отправок (retries) при временных ошибках (например, таймаут сети).
- Как работает: Каждому продюсеру назначается уникальный
PID(Producer ID), а каждому сообщению — монотонно растущийsequence numberдля каждой партиции топика. Брокер Kafka отслеживает эти номера и отбрасывает дубликаты. - Включение: Установите
enable.idempotence=trueв настройках продюсера.
2. Транзакции (Transactions):
- Цель: Обеспечить атомарность при отправке сообщений в несколько партиций/топиков, а также согласованность между чтением и записью (consuming и producing) в паттерне «обработка-запись».
- Как работает: Продюсер координирует с брокером управление транзакциями. Все сообщения, отправленные между
beginTransaction()иcommitTransaction(), либо все будут видны консьюмерам (сisolation.level=read_committed), либо все будут отброшены.
Пример кода продюсера с Exactly-Once:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Включаем идемпотентность и транзакции
props.put("enable.idempotence", "true"); // Неявно устанавливает acks=all, max.in.flight.requests.per.connection=5 (или 1 для <= 2.0)
props.put("transactional.id", "my-app-transactional-id"); // Должен быть уникальным и постоянным для данного продюсера
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // Регистрация продюсера у координатора транзакций
try {
producer.beginTransaction();
// Атомарная отправка в несколько топиков
producer.send(new ProducerRecord<>("orders", orderId, orderJson));
producer.send(new ProducerRecord<>("audit-log", orderId, auditJson));
// ... любая другая бизнес-логика ...
producer.commitTransaction(); // Сообщения становятся видимыми
} catch (KafkaException e) {
producer.abortTransaction(); // Все отправленные в транзакции сообщения отменяются
// Обработка ошибки и возможный retry с новой транзакцией
}
Настройка консьюмера для чтения транзакционных сообщений:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Читаем только закоммиченные транзакции
props.put("isolation.level", "read_committed");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
// Консьюмер не увидит сообщения из незавершенных или прерванных транзакций.
Важные ограничения и детали:
- Требует Kafka >= 0.11.
- Накладные расходы: Транзакции добавляют задержку и используют больше памяти на брокере.
- Exactly-Once Delivery vs. Processing: Гарантия относится к доставке в Kafka. Для сквозной семантики «ровно-один-раз» в приложении (E2E) консьюмер также должен быть идемпотентным (например, сохранять смещения вместе с результатом обработки в транзакционное хранилище).
transactional.id: Должен быть постоянным для данного логического продюсера при перезапусках, чтобы Kafka могла сопоставить его с PID и продолжить отслеживание sequence numbers.
Ответ 18+ 🔞
А, слушай, про эту вашу «Exactly-Once Semantics» в Кафке! Ну, блядь, сказка, а не технология. Суть в том, чтобы каждое сообщение, этот маленький пиздёныш данных, обработалось ровно один раз. Не ноль, не два, а один, мать его! Даже если продюсер, брокер или консьюмер накрылись медным тазом посередине процесса.
Как это, блядь, достигается? А вот как, два кита, на которых всё держится:
1. Идемпотентный продюсер (Idempotent Producer).
Это чтобы продюсер, этот трепло, когда у него сеть чихнула и он переотправляет сообщение, не заспамил всё дублями. Каждому продюсеру дают PID, а каждому сообщению — порядковый номер для партиции. Брокер смотрит: «Ага, номер 5 для PID 123 в партиции 0 я уже видел, иди нахуй со своим дублем». Включается одной строчкой: enable.idempotence=true. Ёпта, красота!
2. Транзакции (Transactions).
Это уже для серьёзных пацанов, которые пишут сразу в несколько топиков. Либо всё записалось, либо нихуя. И главное — для паттерна «прочитал-обработал-записал результат». Чтобы консьюмер, пока он обрабатывает, не увидел свои же промежуточные записи, сделанные в другой топик. Всё это дело оборачивается в beginTransaction() и commitTransaction(). Не пронесло? abortTransaction(), и все следы подчищены.
Вот, смотри, как выглядит этот цирк в коде. Блоки кода не трогаю, они святые.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Включаем магию, сука!
props.put("enable.idempotence", "true");
props.put("transactional.id", "my-app-transactional-id"); // Этот айдишник должен быть как татуировка — навсегда.
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // Регистрируемся, типа «я тут, готов транзакции гонять»
try {
producer.beginTransaction();
// Шлём атомарно, блядь, в два места сразу
producer.send(new ProducerRecord<>("orders", orderId, orderJson));
producer.send(new ProducerRecord<>("audit-log", orderId, auditJson));
// ... тут ещё какая-то бизнес-логика может быть ...
producer.commitTransaction(); // Всё, теперь сообщения видны. Ура!
} catch (KafkaException e) {
producer.abortTransaction(); // Всё пошло по пизде — откатываем, как будто ничего и не было.
// Ну и тут уже решай, перезапускать транзакцию или нет.
}
А консьюмер, чтобы не подглядывать за незаконченными делами, должен надеть шоры:
Properties props = new Properties();
// ... обычные настройки ...
// Читаем ТОЛЬКО закоммиченные транзакции, как честные люди.
props.put("isolation.level", "read_committed");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
Но, блядь, есть нюансы, ебать!
- Версия Кафки нужна 0.11 или выше. На древних версиях — забудь.
- Накладные расходы: Транзакции — это не просто так. Задержки чуть больше, память на брокере жрут. За надёжность платим.
- Exactly-Once Delivery vs. Processing: Это важно, ёпта! Кафка гарантирует, что сообщение доставлено в неё ровно раз. Но чтобы твоё приложение обработало его ровно раз (сквозная гарантия), консьюмер тоже должен быть хитрожопым — например, сохранять смещение вместе с результатом в одну транзакцию БД. Иначе можно наебнуться.
transactional.id: Это твоя паспортная система, блядь. Должен быть постоянным при перезапусках приложения, иначе Кафка тебя не узнает и начнётся пиздец с дублями.
Вот так вот, в двух словах. Выглядит сложно, но когда настроишь — живёшь спокойно, не бздишь, что что-то потеряется или задвоится.