Что происходит с сообщениями в Kafka, если консьюмер не смог их обработать?

Ответ

Kafka сама по себе не отслеживает, было ли сообщение "обработано". Она отслеживает только смещение (offset), которое коммитит (сохраняет) группа консьюмеров.

Если консьюмер читает сообщение, но падает или не может его обработать, он просто не коммитит смещение для этого сообщения. Когда консьюмер перезапустится (или другой консьюмер из той же группы возьмет на себя эту партицию), он начнет чтение с последнего закоммиченного смещения и, таким образом, повторно получит то же самое сообщение.

Само сообщение физически остается в топике до тех пор, пока не истечет срок его хранения (retention.ms) или не будет превышен лимит размера для партиции (retention.bytes).

Стандартные паттерны обработки сбоев:

  1. Retry (Повторные попытки): Консьюмер пытается обработать сообщение несколько раз с задержкой. Если все попытки неудачны, сообщение отправляется в DLQ.
  2. Dead Letter Queue (DLQ): Это отдельный топик Kafka, куда отправляются "битые" или невалидные сообщения после нескольких неудачных попыток обработки. Это позволяет основному потоку данных не блокироваться из-за одного проблемного сообщения.
  3. Ручное управление смещением: В сложных случаях можно вручную управлять смещениями, чтобы пропустить сообщение или вернуться к нему позже.
// Пример цикла обработки с явным коммитом
for {
    msg, err := consumer.FetchMessage(ctx)
    if err != nil {
        // Обработка ошибки чтения из Kafka (например, реконнект)
        break
    }

    if err := processMessage(msg); err != nil {
        // Ошибка обработки. Не коммитим смещение.
        // Логируем ошибку и, возможно, отправляем в DLQ.
        log.Printf("Failed to process message: %v", err)
        continue // Переходим к следующему сообщению
    }

    // Сообщение успешно обработано, коммитим смещение
    consumer.CommitMessages(ctx, msg)
}

Ответ 18+ 🔞

А, слушай, вот эта вся история с Кафкой и обработкой сообщений — это вообще отдельный цирк, блядь. Так, смотри, главное что надо понять: Кафка сама по себе нихуя не знает, обработал ты сообщение или нет. Вообще. Ноль. Пиздец.

Она следит только за одной хуйнёй — за смещением, за этим самым offset, который твоя группа консьюмеров тупо сохраняет, как последнюю прочитанную страницу в книжке.

Вот представь: консьюмер взял сообщение, начал его жрать, а потом — бац! — упал, сдох, сервер отрубили, или в нём просто логика поехала, и он не смог его переварить. Так вот, если он не успел закоммитить это смещение, то он нихуя и не сделал, по мнению Кафки. Когда он (или его напарник из той же группы) очухается и снова полезет в эту партицию, он начнёт читать ровно с того места, где остановился в прошлый раз. И получит то же самое сообщение. Снова. Ебушки-воробушки, вот так вот.

А само сообщение, эта самая порция данных, будет болтаться в топике, пока его не сотрёт политика хранения — либо по времени (retention.ms), либо когда место кончится (retention.bytes).

Ну и как с этим жить, спрашивается? Паттерны, блядь, есть:

  1. Retry (Повторяй, пока не надоест): Консьюмер тупо пытается обработать сообщение раз, другой, третий, с какими-нибудь паузами. Если после N попыток всё равно пиздец — значит, сообщение говно, и его надо выкинуть в специальную помойку.
  2. Dead Letter Queue (Помойка для говна, она же DLQ): Это отдельный топик, куда как раз и летят эти некондиционные сообщения после всех неудачных попыток. Главная фишка в том, чтобы основная очередь не встала колом из-за одного ебанутого сообщения, которое все ломает.
  3. Ручное управление смещением (Для самых умных и/или отчаянных): Можно вручную, на свой страх и риск, сказать: "всё, это сообщение — хуйня, пропускаем" или "надо бы к нему вернуться попозже". Но это уже высший пилотаж, там дохуя граблей.
// Примерный код, где всё видно как на ладони
for {
    // Тянем сообщение из Кафки
    msg, err := consumer.FetchMessage(ctx)
    if err != nil {
        // Ой, всё! С Кафкой что-то не так (коннект порвался, например)
        break
    }

    // Пытаемся его обработать
    if err := processMessage(msg); err != nil {
        // Не, не вышло, ёпта! Смещение НЕ коммитим — значит, сообщение останется в очереди.
        // Логируем, что всё хуёво, и, возможно, пинаем сообщение в сторону DLQ.
        log.Printf("Не, ну это пиздец, сообщение не обработалось: %v", err)
        continue // Идём пробовать следующее, авось повезёт
    }

    // Ура, товарищи! Сообщение переварено. Теперь можно и закоммитить смещение.
    consumer.CommitMessages(ctx, msg)
}

Вот и вся магия, блядь. Главное — не забывать, что коммит смещения и успешная обработка сообщения должны быть как сиамские близнецы, иначе тебе придётся жрать одно и то же по десять раз.