Можно ли явно распределять сообщения по партициям в Kafka?

Ответ

Да, в Apache Kafka можно контролировать распределение сообщений по партициям несколькими способами. Это важно для обеспечения порядка обработки сообщений и балансировки нагрузки.

Основные механизмы:

1. Указание ключа сообщения (Message Key)

  • Сообщения с одинаковым ключом всегда попадают в одну и ту же партицию (на основе хэширования).
  • Гарантирует порядок обработки для записей в пределах одного ключа.
using Confluent.Kafka;

var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
using var producer = new ProducerBuilder<string, string>(config).Build();

// Сообщения с одинаковым 'userId' попадут в одну партицию
var message1 = new Message<string, string> 
{ 
    Key = "user123", 
    Value = "User logged in" 
};
var message2 = new Message<string, string> 
{ 
    Key = "user123", 
    Value = "User made a purchase" 
};

await producer.ProduceAsync("user-events", message1);
await producer.ProduceAsync("user-events", message2);

2. Прямое указание номера партиции

  • Можно явно отправить сообщение в конкретную партицию, минуя стандартный партиционер.
// Отправка строго в партицию 2
var tp = new TopicPartition("my-topic", new Partition(2));
await producer.ProduceAsync(tp, new Message<Null, string> { Value = "Direct message" });

3. Кастомный партиционер

  • Можно реализовать интерфейс ICustomPartitioner или IProducer для сложной логики распределения.
public class CustomPartitioner : Partitioner
{
    public override int Partition(string topic, int partitionCount, ReadOnlySpan<byte> keyData, bool keyIsNull)
    {
        // Ваша логика: например, по первому байту ключа
        return keyIsNull ? 0 : keyData[0] % partitionCount;
    }
}

Важные замечания:

  • Если ключ null, используется round-robin распределение между партициями.
  • Изменение количества партиций топика нарушит соответствие ключ→партиция.
  • Для абсолютного порядка в масштабе всего топика используйте топик с одной партицией.

Ответ 18+ 🔞

Слушай, вот это тема интересная, про распределение сообщений по партициям в Кафке. Ну, то есть как их раскидать, чтобы не получилось, что все сообщения в одну партицию прут, а остальные пустуют, как выселенные квартиры. Такого, конечно, не хочется.

Основные способы, которыми можно это дело контролировать:

1. Ключ сообщения (Message Key) — самый частый случай Вот смотри, если ты указываешь ключ, то все сообщения с одинаковым этим ключом всегда будут лететь в одну и ту же партицию. Это, блядь, основа порядка. Хочешь, чтобы все события по юзеру user123 обрабатывались по порядку — задавай его как ключ. Без ключа они бы разъехались кто куда, и порядок нахуй слетел.

using Confluent.Kafka;

var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
using var producer = new ProducerBuilder<string, string>(config).Build();

// Вот эти два сообщения упрутся в одну партицию, потому что ключ одинаковый.
// Сначала залогинился, потом купил — и обработчик это увидит именно в такой последовательности.
var message1 = new Message<string, string> 
{ 
    Key = "user123", 
    Value = "User logged in" 
};
var message2 = new Message<string, string> 
{ 
    Key = "user123", 
    Value = "User made a purchase" 
};

await producer.ProduceAsync("user-events", message1);
await producer.ProduceAsync("user-events", message2);

2. Прямой указатель партиции — когда ты сам себе режиссер А вот это уже, понимаешь, ручное управление. Хочу в партицию номер 2 — пожалуйста, получай. Минуя все эти хэширования и стандартные партиционеры. Полезно, когда у тебя своя, ебанутая логика, которую в ключ не впихнёшь.

// Всё, приехали. Сообщение летит прямиком в партицию 2, как поезд в тупик.
var tp = new TopicPartition("my-topic", new Partition(2));
await producer.ProduceAsync(tp, new Message<Null, string> { Value = "Direct message" });

3. Свой, кастомный партиционер — для самых хитрых Когда стандартные механизмы — это как ложкой суп хлебать, а тебе надо вилкой, вот тут пишешь свою реализацию. Например, партицию по первому байту ключа определять. Главное — не накосячить с логикой, а то балансировки не будет никакой.

public class CustomPartitioner : Partitioner
{
    public override int Partition(string topic, int partitionCount, ReadOnlySpan<byte> keyData, bool keyIsNull)
    {
        // Ну вот, например, смотрим на первый байт ключа и от него пляшем.
        // Если ключа нет — в нулевую партицию его, на растерзание.
        return keyIsNull ? 0 : keyData[0] % partitionCount;
    }
}

Что важно помнить, а то обосрёшься:

  • Если ключ не указан (null), то Кафка будет раскидывать сообщения по партициям по кругу (round-robin). Иногда это то, что надо, но про порядок обработки можешь забыть, как про вчерашний день.
  • Количество партиций — штука фиксированная на момент отправки. Если ты потом, с горяча, добавишь партиций в топик, то соответствие «ключ → партиция» может ебнуться. Сообщения с одним ключом начнут попадать в другие партиции. Пиздец порядку.
  • Если тебе важен абсолютный порядок ВСЕХ сообщений в топике, то делай одну партицию. Но тогда ты упрёшься в её производительность, как в стену. Выбор, блядь, всегда есть, и он всегда неидеальный.