Ответ
Kafka сама по себе не отслеживает, было ли сообщение "обработано". Она отслеживает только смещение (offset), которое коммитит (сохраняет) группа консьюмеров.
Если консьюмер читает сообщение, но падает или не может его обработать, он просто не коммитит смещение для этого сообщения. Когда консьюмер перезапустится (или другой консьюмер из той же группы возьмет на себя эту партицию), он начнет чтение с последнего закоммиченного смещения и, таким образом, повторно получит то же самое сообщение.
Само сообщение физически остается в топике до тех пор, пока не истечет срок его хранения (retention.ms
) или не будет превышен лимит размера для партиции (retention.bytes
).
Стандартные паттерны обработки сбоев:
- Retry (Повторные попытки): Консьюмер пытается обработать сообщение несколько раз с задержкой. Если все попытки неудачны, сообщение отправляется в DLQ.
- Dead Letter Queue (DLQ): Это отдельный топик Kafka, куда отправляются "битые" или невалидные сообщения после нескольких неудачных попыток обработки. Это позволяет основному потоку данных не блокироваться из-за одного проблемного сообщения.
- Ручное управление смещением: В сложных случаях можно вручную управлять смещениями, чтобы пропустить сообщение или вернуться к нему позже.
// Пример цикла обработки с явным коммитом
for {
msg, err := consumer.FetchMessage(ctx)
if err != nil {
// Обработка ошибки чтения из Kafka (например, реконнект)
break
}
if err := processMessage(msg); err != nil {
// Ошибка обработки. Не коммитим смещение.
// Логируем ошибку и, возможно, отправляем в DLQ.
log.Printf("Failed to process message: %v", err)
continue // Переходим к следующему сообщению
}
// Сообщение успешно обработано, коммитим смещение
consumer.CommitMessages(ctx, msg)
}