Ответ
Взаимодействие строится на принципе: одна партиция топика потребляется ровно одним консьюмером в рамках консьюмер-группы. Это гарантирует порядок обработки сообщений в пределах партиции.
Основные правила:
- Консьюмер-группа (
group.id) — логическая группа консьюмеров, совместно обрабатывающих один или несколько топиков. - Распределение партиций: Kafka автоматически распределяет все партиции топика между активными консьюмерами группы. Процесс называется ребаллансировкой.
- Максимальный параллелизм для группы ограничен количеством партиций в топике. Если консьюмеров больше, чем партиций, часть консьюмеров будет простаивать.
Практический пример на Java (Kafka Clients):
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "order-processors"); // Все консьюмеры с этим ID — одна группа
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders-topic")); // Подписка на топик
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Partition=%d, Offset=%d, Key=%s, Value=%s%n",
record.partition(), record.offset(), record.key(), record.value());
// Обработка сообщения. Все сообщения из partition 0 будут обработаны этим консьюмером.
}
}
Сценарий: Если топик orders-topic имеет 4 партиции, а в группе order-processors запущено 2 консьюмера, то каждому будет назначено по 2 партиции. Если запустить третьего консьюмера, произойдет ребаллансировка и партиции распределятся как 2-1-1. Если один консьюмер отключится, его партиции будут перераспределены между оставшимися.