Ответ
Offset (смещение) — это указатель на последнее прочитанное сообщение в партиции топика. Его хранение и управление критически важны для гарантий доставки сообщений. Существует два основных подхода:
1. В брокере Kafka (автоматическое управление)
Это стандартный и наиболее распространенный способ. Kafka хранит offset'ы для каждой consumer group в специальном внутреннем топике с именем __consumer_offsets
.
- Как это работает: Консьюмер периодически (или после обработки сообщения) отправляет коммит (commit) своего offset'а брокеру. Брокер сохраняет эту информацию. При перезапуске или перебалансировке группы консьюмер запрашивает у брокера последний сохраненный offset и продолжает чтение с этого места.
- Преимущества: Простота, надежность. Kafka сама заботится о хранении и отказоустойчивости.
2. На стороне консьюмера (ручное управление)
В этом случае консьюмер сам отвечает за хранение offset'ов. Для этого в настройках консьюмера необходимо отключить автоматический коммит (EnableAutoCommit=false
).
- Где хранить: В любом внешнем хранилище, поддерживающем транзакции, например, в реляционной базе данных (PostgreSQL, MySQL) или NoSQL-хранилище (Redis).
- Зачем это нужно: Для реализации семантики обработки "exactly-once". Это позволяет атомарно выполнить две операции в одной транзакции: обработать сообщение и сохранить новый offset. Если одна из операций завершится неудачно, вся транзакция откатывается, и сообщение будет обработано повторно.
Пример на Go с библиотекой sarama
(демонстрация ручного коммита):
// В настройках консьюмера нужно указать:
// config.Consumer.Offsets.AutoCommit.Enable = false
// В цикле обработки сообщений:
session, _ := sarama.NewConsumerGroup(..., ..., config)
// ... в методе ConsumeClaim ...
for msg := range claim.Messages() {
// 1. Обрабатываем сообщение
err := processMessage(msg)
// 2. Если обработка успешна, коммитим offset вручную
if err == nil {
session.MarkMessage(msg, "") // Помечаем сообщение для коммита
}
// Коммит будет выполнен либо по таймеру, либо при следующем вызове
}