Ответ
В Apache Kafka механизм подтверждения (acknowledgment) сообщений является ключевым для обеспечения надежности доставки и настраивается как на стороне продюсера, так и на стороне консьюмера.
Подтверждения для продюсеров (acks
)
Продюсеры используют параметр acks
(acknowledgments) для определения уровня надежности, с которым они отправляют сообщения. Этот параметр контролирует, сколько подтверждений от брокеров должен получить продюсер, прежде чем считать сообщение успешно отправленным.
acks=0
(No Acks):- Продюсер не ждет никаких подтверждений от брокера. Он отправляет сообщение и сразу переходит к следующему.
- Плюсы: Максимальная пропускная способность, минимальная задержка.
- Минусы: Высокий риск потери данных (сообщение может быть потеряно, если лидер партиции упадет до записи сообщения).
acks=1
(Leader Acks):- Продюсер ждет подтверждения только от лидера партиции.
- Плюсы: Хороший баланс между пропускной способностью и надежностью. Сообщение гарантированно записано на лидер-брокер.
- Минусы: Риск потери данных, если лидер упадет до того, как сообщение будет реплицировано на все синхронные реплики (ISR).
- По умолчанию для большинства клиентов.
acks=all
илиacks=-1
(All Replicas Acks):- Продюсер ждет подтверждения от лидера и от всех синхронных реплик (ISR).
- Плюсы: Максимальная надежность. Сообщение гарантированно записано на лидер-брокер и на все его синхронные реплики, что минимизирует риск потери данных даже при падении лидера.
- Минусы: Самая низкая пропускная способность, самая высокая задержка.
Пример настройки продюсера в Go (с использованием sarama
):
package main
import (
"fmt"
"log"
"github.com/IBM/sarama"
)
func main() {
brokers := []string{"localhost:9092"}
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Устанавливаем acks=all для максимальной надежности
config.Producer.Return.Successes = true // Важно для получения подтверждений
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatalf("Ошибка при создании продюсера: %v", err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "my_topic",
Value: sarama.StringEncoder("Hello Kafka!"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("Ошибка при отправке сообщения: %v", err)
} else {
fmt.Printf("Сообщение отправлено в партицию %d, оффсет %dn", partition, offset)
}
}
Подтверждения для консьюмеров (Commit Offset)
Для консьюмеров механизм подтверждения связан с фиксацией оффсетов (commit offset). Консьюмер сообщает Kafka, до какого оффсета в партиции он успешно обработал сообщения. Это позволяет Kafka знать, с какого места консьюмер должен продолжить чтение в случае перезапуска или сбоя.
Существует два основных режима фиксации оффсетов:
Автоматическая фиксация (Auto Commit):
- Консьюмер автоматически фиксирует оффсеты через определенные интервалы времени (параметр
auto.commit.interval.ms
). - Плюсы: Простота в использовании, не требует ручного управления.
- Минусы: Риск потери данных (если консьюмер упадет после авто-фиксации, но до фактической обработки сообщения) или дублирования (если упадет до авто-фиксации, но после обработки).
- Консьюмер автоматически фиксирует оффсеты через определенные интервалы времени (параметр
Ручная фиксация (Manual Commit):
- Консьюмер явно вызывает метод для фиксации оффсета после успешной обработки сообщения или группы сообщений.
- Плюсы: Точный контроль над фиксацией оффсетов, что позволяет реализовать семантику "как минимум один раз" (at-least-once) или "ровно один раз" (exactly-once) в сочетании с транзакциями.
- Минусы: Требует более сложной логики обработки ошибок и управления состоянием.
Пример ручной фиксации оффсета в Go (с использованием sarama
):
package main
import (
"context"
"fmt"
"log"
"sync"
"github.com/IBM/sarama"
)
// ConsumerGroupHandler реализует sarama.ConsumerGroupHandler
type ConsumerGroupHandler struct{}
func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
fmt.Printf("Сообщение получено: Topic=%s, Partition=%d, Offset=%d, Value=%sn",
message.Topic, message.Partition, message.Offset, string(message.Value))
// Здесь должна быть логика обработки сообщения
// ...
// Ручная фиксация оффсета после успешной обработки
session.MarkMessage(message, "") // Отмечаем сообщение как обработанное
session.Commit() // Фиксируем оффсет
}
return nil
}
func main() {
brokers := []string{"localhost:9092"}
consumerGroup := "my_consumer_group"
topics := []string{"my_topic"}
config := sarama.NewConfig()
config.Consumer.Offsets.AutoCommit.Enable = false // Отключаем авто-фиксацию
config.Consumer.Return.Errors = true
client, err := sarama.NewConsumerGroup(brokers, consumerGroup, config)
if err != nil {
log.Fatalf("Ошибка при создании группы консьюмеров: %v", err)
}
defer client.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := client.Consume(ctx, topics, ConsumerGroupHandler{}); err != nil {
log.Printf("Ошибка при потреблении: %v", err)
if err == sarama.ErrClosedConsumerGroup { return } // Группа закрыта
}
// Если контекст отменен, выходим
if ctx.Err() != nil { return }
}
}()
log.Println("Консьюмер запущен. Нажмите Ctrl+C для выхода.")
select { // Ждем сигнала завершения
case <-ctx.Done():
}
wg.Wait()
log.Println("Консьюмер завершил работу.")
}
Правильный выбор стратегии подтверждения зависит от требований к надежности и производительности вашего приложения.