Ответ
В Apache Kafka продюсер определяет, в какую партицию топика отправить сообщение, с помощью компонента под названием Partitioner (партиционировщик).
Стратегия выбора партиции выглядит следующим образом:
Партиция указана явно: Если в сообщении, которое отправляет продюсер, явно указан номер партиции, сообщение будет отправлено именно туда. Это дает полный контроль, но используется редко.
Ключ сообщения (
key
) указан: Если партиция не указана, но у сообщения есть ключ, то партиционировщик вычисляет хэш от ключа и делит его по модулю на количество партиций в топике:hash(key) % num_partitions
.- Гарантия: Все сообщения с одинаковым ключом всегда попадут в одну и ту же партицию. Это критически важно для сохранения порядка обработки сообщений (например, все события для одного
user_id
должны обрабатываться последовательно).
- Гарантия: Все сообщения с одинаковым ключом всегда попадут в одну и ту же партицию. Это критически важно для сохранения порядка обработки сообщений (например, все события для одного
Ключ и партиция не указаны: Если нет ни ключа, ни партиции, продюсер распределяет сообщения по доступным партициям для равномерной балансировки нагрузки.
- Старая стратегия (до Kafka 2.4):
Round-robin
. Сообщения поочередно отправлялись в каждую партицию (p0, p1, p2, p0, p1, ...). - Новая стратегия (Sticky Partitioner): Продюсер "залипает" на одной партиции и отправляет в нее накопленный батч сообщений. Затем выбирает следующую партицию и "залипает" на ней. Это снижает задержку и нагрузку на брокеры, так как создается меньше запросов с меньшим количеством батчей.
- Старая стратегия (до Kafka 2.4):
Кастомный Partitioner
Можно реализовать собственную логику партиционирования. Например, в Go с использованием библиотеки sarama
это делается путем реализации интерфейса Partitioner
и его регистрации в конфигурации продюсера.
// Пример интерфейса в sarama
type Partitioner interface {
// Принимает сообщение и общее число партиций,
// возвращает номер партиции для записи.
Partition(message *ProducerMessage, numPartitions int32) (int32, error)
RequiresConsistency() bool
}