Ответ
Репликация в Kafka — это ключевой механизм для обеспечения отказоустойчивости и долговечности данных. Она работает на уровне партиций топика.
Модель Лидер-Последователь (Leader-Follower)
- Лидер (Leader): Для каждой партиции один из брокеров выбирается лидером. Только лидер обрабатывает все запросы на запись и чтение для этой партиции.
- Последователи (Followers): Остальные брокеры, на которых размещены копии партиции, становятся последователями. Их единственная задача — асинхронно копировать данные с лидера.
In-Sync Replicas (ISR)
ISR — это множество реплик (включая лидера), которые не сильно отстают от лидера. Реплика считается "in-sync", если она успевает запрашивать у лидера новые сообщения в пределах времени, заданного параметром replica.lag.time.max.ms
(по умолчанию 30 секунд).
Процесс записи и подтверждения (acks
)
Надежность записи напрямую зависит от настройки acks
у продюсера:
acks=0
: Продюсер не ждет подтверждения от брокера. Максимальная производительность, но возможна потеря данных.acks=1
(по умолчанию): Продюсер ждет подтверждения только от лидера. Запись считается успешной, как только лидер записал сообщение в свой лог. Данные могут быть потеряны, если лидер упадет до того, как последователи скопируют сообщение.acks=all
(или-1
): Продюсер ждет подтверждения от лидера после того, как все реплики из ISR скопировали сообщение. Это обеспечивает максимальную гарантию сохранности данных, но с более высокой задержкой.
Отказоустойчивость
Если брокер-лидер падает, Kafka Controller выбирает нового лидера из числа реплик, находящихся в ISR. Это гарантирует, что новый лидер обладает всеми подтвержденными сообщениями.
Если все реплики из ISR становятся недоступны, Kafka по умолчанию (с unclean.leader.election.enable=false
) не будет выбирать нового лидера, предпочитая недоступность партиции потере данных.
// Пример создания топика с фактором репликации 3
// Используется библиотека confluent-kafka-go
adminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": brokers})
// ... обработка ошибки
_, err = adminClient.CreateTopics(ctx, []kafka.TopicSpecification{{
Topic: "my-topic",
NumPartitions: 6,
ReplicationFactor: 3, // 1 лидер и 2 реплики для каждой партиции
}})