Как реализован механизм подтверждения (acknowledgment) сообщений в Apache Kafka?

Ответ

В Apache Kafka механизм подтверждения (acknowledgment) сообщений является ключевым для обеспечения надежности доставки и настраивается как на стороне продюсера, так и на стороне консьюмера.

Подтверждения для продюсеров (acks)

Продюсеры используют параметр acks (acknowledgments) для определения уровня надежности, с которым они отправляют сообщения. Этот параметр контролирует, сколько подтверждений от брокеров должен получить продюсер, прежде чем считать сообщение успешно отправленным.

  • acks=0 (No Acks):

    • Продюсер не ждет никаких подтверждений от брокера. Он отправляет сообщение и сразу переходит к следующему.
    • Плюсы: Максимальная пропускная способность, минимальная задержка.
    • Минусы: Высокий риск потери данных (сообщение может быть потеряно, если лидер партиции упадет до записи сообщения).
  • acks=1 (Leader Acks):

    • Продюсер ждет подтверждения только от лидера партиции.
    • Плюсы: Хороший баланс между пропускной способностью и надежностью. Сообщение гарантированно записано на лидер-брокер.
    • Минусы: Риск потери данных, если лидер упадет до того, как сообщение будет реплицировано на все синхронные реплики (ISR).
    • По умолчанию для большинства клиентов.
  • acks=all или acks=-1 (All Replicas Acks):

    • Продюсер ждет подтверждения от лидера и от всех синхронных реплик (ISR).
    • Плюсы: Максимальная надежность. Сообщение гарантированно записано на лидер-брокер и на все его синхронные реплики, что минимизирует риск потери данных даже при падении лидера.
    • Минусы: Самая низкая пропускная способность, самая высокая задержка.

Пример настройки продюсера в Go (с использованием sarama):

package main

import (
    "fmt"
    "log"
    "github.com/IBM/sarama"
)

func main() {
    brokers := []string{"localhost:9092"}

    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll // Устанавливаем acks=all для максимальной надежности
    config.Producer.Return.Successes = true // Важно для получения подтверждений

    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        log.Fatalf("Ошибка при создании продюсера: %v", err)
    }
    defer producer.Close()

    msg := &sarama.ProducerMessage{
        Topic: "my_topic",
        Value: sarama.StringEncoder("Hello Kafka!"),
    }

    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
        log.Printf("Ошибка при отправке сообщения: %v", err)
    } else {
        fmt.Printf("Сообщение отправлено в партицию %d, оффсет %dn", partition, offset)
    }
}

Подтверждения для консьюмеров (Commit Offset)

Для консьюмеров механизм подтверждения связан с фиксацией оффсетов (commit offset). Консьюмер сообщает Kafka, до какого оффсета в партиции он успешно обработал сообщения. Это позволяет Kafka знать, с какого места консьюмер должен продолжить чтение в случае перезапуска или сбоя.

Существует два основных режима фиксации оффсетов:

  1. Автоматическая фиксация (Auto Commit):

    • Консьюмер автоматически фиксирует оффсеты через определенные интервалы времени (параметр auto.commit.interval.ms).
    • Плюсы: Простота в использовании, не требует ручного управления.
    • Минусы: Риск потери данных (если консьюмер упадет после авто-фиксации, но до фактической обработки сообщения) или дублирования (если упадет до авто-фиксации, но после обработки).
  2. Ручная фиксация (Manual Commit):

    • Консьюмер явно вызывает метод для фиксации оффсета после успешной обработки сообщения или группы сообщений.
    • Плюсы: Точный контроль над фиксацией оффсетов, что позволяет реализовать семантику "как минимум один раз" (at-least-once) или "ровно один раз" (exactly-once) в сочетании с транзакциями.
    • Минусы: Требует более сложной логики обработки ошибок и управления состоянием.

Пример ручной фиксации оффсета в Go (с использованием sarama):

package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "github.com/IBM/sarama"
)

// ConsumerGroupHandler реализует sarama.ConsumerGroupHandler
type ConsumerGroupHandler struct{}

func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        fmt.Printf("Сообщение получено: Topic=%s, Partition=%d, Offset=%d, Value=%sn",
            message.Topic, message.Partition, message.Offset, string(message.Value))

        // Здесь должна быть логика обработки сообщения
        // ...

        // Ручная фиксация оффсета после успешной обработки
        session.MarkMessage(message, "") // Отмечаем сообщение как обработанное
        session.Commit() // Фиксируем оффсет
    }
    return nil
}

func main() {
    brokers := []string{"localhost:9092"}
    consumerGroup := "my_consumer_group"
    topics := []string{"my_topic"}

    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false // Отключаем авто-фиксацию
    config.Consumer.Return.Errors = true

    client, err := sarama.NewConsumerGroup(brokers, consumerGroup, config)
    if err != nil {
        log.Fatalf("Ошибка при создании группы консьюмеров: %v", err)
    }
    defer client.Close()

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            if err := client.Consume(ctx, topics, ConsumerGroupHandler{}); err != nil {
                log.Printf("Ошибка при потреблении: %v", err)
                if err == sarama.ErrClosedConsumerGroup { return } // Группа закрыта
            }
            // Если контекст отменен, выходим
            if ctx.Err() != nil { return }
        }
    }()

    log.Println("Консьюмер запущен. Нажмите Ctrl+C для выхода.")
    select { // Ждем сигнала завершения
    case <-ctx.Done():
    }

    wg.Wait()
    log.Println("Консьюмер завершил работу.")
}

Правильный выбор стратегии подтверждения зависит от требований к надежности и производительности вашего приложения.