Где Kafka хранит offset’ы консьюмеров и какие существуют способы управления ими?

Ответ

Offset (смещение) — это указатель на последнее прочитанное сообщение в партиции топика. Его хранение и управление критически важны для гарантий доставки сообщений. Существует два основных подхода:

1. В брокере Kafka (автоматическое управление)

Это стандартный и наиболее распространенный способ. Kafka хранит offset'ы для каждой consumer group в специальном внутреннем топике с именем __consumer_offsets.

  • Как это работает: Консьюмер периодически (или после обработки сообщения) отправляет коммит (commit) своего offset'а брокеру. Брокер сохраняет эту информацию. При перезапуске или перебалансировке группы консьюмер запрашивает у брокера последний сохраненный offset и продолжает чтение с этого места.
  • Преимущества: Простота, надежность. Kafka сама заботится о хранении и отказоустойчивости.

2. На стороне консьюмера (ручное управление)

В этом случае консьюмер сам отвечает за хранение offset'ов. Для этого в настройках консьюмера необходимо отключить автоматический коммит (EnableAutoCommit=false).

  • Где хранить: В любом внешнем хранилище, поддерживающем транзакции, например, в реляционной базе данных (PostgreSQL, MySQL) или NoSQL-хранилище (Redis).
  • Зачем это нужно: Для реализации семантики обработки "exactly-once". Это позволяет атомарно выполнить две операции в одной транзакции: обработать сообщение и сохранить новый offset. Если одна из операций завершится неудачно, вся транзакция откатывается, и сообщение будет обработано повторно.

Пример на Go с библиотекой sarama (демонстрация ручного коммита):

// В настройках консьюмера нужно указать:
// config.Consumer.Offsets.AutoCommit.Enable = false

// В цикле обработки сообщений:
session, _ := sarama.NewConsumerGroup(..., ..., config)

// ... в методе ConsumeClaim ...
for msg := range claim.Messages() {
    // 1. Обрабатываем сообщение
    err := processMessage(msg)

    // 2. Если обработка успешна, коммитим offset вручную
    if err == nil {
        session.MarkMessage(msg, "") // Помечаем сообщение для коммита
    }
    // Коммит будет выполнен либо по таймеру, либо при следующем вызове
}