Как происходит репликация данных в Apache Kafka?

Ответ

Репликация в Kafka — это ключевой механизм для обеспечения отказоустойчивости и долговечности данных. Она работает на уровне партиций топика.

Модель Лидер-Последователь (Leader-Follower)

  1. Лидер (Leader): Для каждой партиции один из брокеров выбирается лидером. Только лидер обрабатывает все запросы на запись и чтение для этой партиции.
  2. Последователи (Followers): Остальные брокеры, на которых размещены копии партиции, становятся последователями. Их единственная задача — асинхронно копировать данные с лидера.

In-Sync Replicas (ISR)

ISR — это множество реплик (включая лидера), которые не сильно отстают от лидера. Реплика считается "in-sync", если она успевает запрашивать у лидера новые сообщения в пределах времени, заданного параметром replica.lag.time.max.ms (по умолчанию 30 секунд).

Процесс записи и подтверждения (acks)

Надежность записи напрямую зависит от настройки acks у продюсера:

  • acks=0: Продюсер не ждет подтверждения от брокера. Максимальная производительность, но возможна потеря данных.
  • acks=1 (по умолчанию): Продюсер ждет подтверждения только от лидера. Запись считается успешной, как только лидер записал сообщение в свой лог. Данные могут быть потеряны, если лидер упадет до того, как последователи скопируют сообщение.
  • acks=all (или -1): Продюсер ждет подтверждения от лидера после того, как все реплики из ISR скопировали сообщение. Это обеспечивает максимальную гарантию сохранности данных, но с более высокой задержкой.

Отказоустойчивость

Если брокер-лидер падает, Kafka Controller выбирает нового лидера из числа реплик, находящихся в ISR. Это гарантирует, что новый лидер обладает всеми подтвержденными сообщениями.

Если все реплики из ISR становятся недоступны, Kafka по умолчанию (с unclean.leader.election.enable=false) не будет выбирать нового лидера, предпочитая недоступность партиции потере данных.

// Пример создания топика с фактором репликации 3
// Используется библиотека confluent-kafka-go

adminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": brokers})
// ... обработка ошибки

_, err = adminClient.CreateTopics(ctx, []kafka.TopicSpecification{{
    Topic:             "my-topic",
    NumPartitions:     6,
    ReplicationFactor: 3, // 1 лидер и 2 реплики для каждой партиции
}})