Ответ
Для гарантии доставки и предотвращения дублирования в Kafka используется механизм управления смещениями (offset management). Каждая группа потребителей (consumer group) отслеживает свою позицию (offset) в каждой партиции топика.
Существует три основных семантики доставки:
- At-most-once (не более одного раза): Смещение коммитится до обработки сообщения. При сбое сообщение будет потеряно.
- At-least-once (как минимум один раз): Смещение коммитится после обработки. При сбое до коммита сообщение может быть обработано повторно. Это самый частый сценарий.
- Exactly-once (строго один раз): Гарантирует, что сообщение обработано ровно один раз, даже при сбоях. Достигается с помощью транзакций и идемпотентного продюсера.
Способы управления смещениями:
-
Автоматический коммит (
enable.auto.commit=true): Брокер периодически коммитит смещение за потребителя. Просто, но рискованно — возможны как потери, так и дубликаты. -
Ручной коммит (
enable.auto.commit=false): Потребитель явно подтверждает обработку. Это надежный способ для реализации at-least-once.
Пример ручного коммита в Go с использованием sarama (современный подход с ConsumerGroup):
// ConsumerGroupHandler реализует логику обработки
type handler struct{}
func (h *handler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *handler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
// 1. Обрабатываем сообщение
processMessage(msg)
// 2. Явно коммитим offset
sess.MarkMessage(msg, "")
}
return nil
}
Для строгой гарантии exactly-once необходимо использовать Transactional API в Kafka, что требует более сложной координации между продюсером и консьюмером.
Ответ 18+ 🔞
А, слушай, вот эта вся хрень с доставкой сообщений в Кафке — это вообще отдельный цирк с конями, блядь. Ты как думаешь, они там просто так летают, эти твои месседжи? Нет, сука, там целая наука, как их не потерять и не задвоить, ёпта!
Каждая банда потребителей, то есть consumer group, у себя в блокнотике записывает, на каком она месте остановилась в каждой этой… партиции, блядь. Это смещение, offset называется. И от того, как они это записывают, всё и зависит.
Вот три главных подхода, прям как в жизни:
- At-most-once (не более одного раза): Это когда ты такой: «О, сообщение пришло! Щас я его в блокнот запишу, что прочитал, а потом уже буду разбираться, что там внутри». А потом бац — свет вырубили, ты нахуй ничего не обработал, но в блокноте-то галочка стоит! Сообщение потеряно, ищи-свищи. Хуёвая схема.
- At-least-once (как минимум один раз): А это обратная история. Ты такой: «Сначала, блядь, разберусь с этим сообщением, всё сделаю, и только потом галочку в блокноте поставлю». Вроде надёжно? А вот хрен! Если после обработки, но до галочки, система грохнется — ты получишь это же сообщение ещё разок, потому что галочки-то нет! Может и два, и три раза прилететь. Но хотя бы не потеряется. Так живёт 80% всех систем, потому что проще.
- Exactly-once (строго один раз): Вот это, блядь, высший пилотаж, ядрёна вошь! Чтобы гарантировать, что сообщение обработано ровно ОДИН раз, даже если всё вокруг горит и падает. Тут уже нужны транзакции и идемпотентный продюсер (который, если шлёт одно и то же сто раз, в итоге запись будет одна). Сложно, накладно, но если очень надо — то можно.
Ну и как этим, блядь, рулить-то?
-
Автоматический коммит (
enable.auto.commit=true): Во, самый распиздяйский способ! Ты сказал системе: «Да похуй, делай что хочешь». А она тебе раз в N времени сама ставит галочки в блокноте. Удобно? Да, ебать, как две копейки. А надёжно? Нихуя! Можешь и потерять сообщения, и получить дубли, потому что время коммита и обработки — это два разных дела, блядь. -
Ручной коммит (
enable.auto.commit=false): А вот это уже серьёзно. Ты берёшь ответственность на себя, мудак. Обработал сообщение — сам, своими руками, поставил галочку. Это и есть нормальный путь для at-least-once.
Смотри, как это в Go на sarama выглядит (современный вариант с ConsumerGroup):
// ConsumerGroupHandler реализует логику обработки
type handler struct{}
func (h *handler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *handler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
// 1. Обрабатываем сообщение (тут твоя магия)
processMessage(msg)
// 2. Только ВОТ ТЕПЕРЬ, сука, говорим Кафке: "Всё, я это проглотил, ставь галочку"
sess.MarkMessage(msg, "")
}
return nil
}
А если тебе прям exactly-once до зарезу нужно — готовься к геморрою с Transactional API. Это когда продюсер и консьюмер начинают друг другу писать письма и согласовывать каждый чих, чтобы ни одна сука не проскочила дважды и ни одна не потерялась. Красота, но овердохуища возни.