Блокируется ли поток при отправке или получении сообщений из Kafka?

Ответ

По умолчанию Kafka работает неблокирующе, но поведение можно настроить в зависимости от требований.

1. Producer (отправка сообщений):

// АСИНХРОННАЯ отправка (не блокирует поток)
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
producer.send(record, (metadata, exception) -> {
    // Callback выполнится при получении подтверждения от брокера
    if (exception != null) {
        logger.error("Ошибка отправки", exception);
    } else {
        logger.info("Отправлено в partition {} offset {}", 
                   metadata.partition(), metadata.offset());
    }
});
// Продолжение выполнения кода без ожидания

// СИНХРОННАЯ отправка (блокирует поток до подтверждения)
try {
    RecordMetadata metadata = producer.send(record).get(); // БЛОКИРУЮЩИЙ ВЫЗОВ
    System.out.println("Отправлено: " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

2. Consumer (получение сообщений):

// НЕБЛОКИРУЮЩИЙ poll() с таймаутом
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
    // Нет сообщений - продолжаем работу
    return;
}
for (ConsumerRecord<String, String> record : records) {
    processMessage(record.value());
}

3. Современные реактивные подходы (Spring Kafka):

@Bean
public ReactiveKafkaConsumerTemplate<String, String> reactiveConsumer() {
    return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
}

// Реактивная обработка без блокировки
reactiveConsumer.receive()
    .doOnNext(record -> {
        System.out.println("Получено: " + record.value());
        record.receiverOffset().acknowledge();
    })
    .subscribe();

Рекомендации:

  • Используйте асинхронную отправку с callback для максимальной производительности
  • Настройте acks, retries, max.in.flight.requests.per.connection для баланса между надежностью и скоростью
  • Для потребителей используйте максимально короткие таймауты в poll()
  • Рассмотрите реактивные реализации (Kafka Streams, Spring Reactor Kafka) для сложных пайплайнов обработки

Ответ 18+ 🔞

А, ну вот, классика жанра, блядь! Смотри, как оно на самом деле работает, а то все эти умные слова про «неблокирующе» — нихуя не понятно.

1. Producer (тот, кто шлёт сообщения): Тут, сука, два пути, как в жизни: либо быстро и без гарантий, либо ждать, пока всё подтвердят, как идиот.

// Вариант «похуй, отправь и забей» (асинхронный, не блокирует)
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
producer.send(record, (metadata, exception) -> {
    // А вот эта штука дёрнется потом, когда брокер ответит. Может, через час, ёпта.
    if (exception != null) {
        logger.error("Ой, всё, не улетело!", exception); // Вот тут и узнаешь, что всё просрал
    } else {
        logger.info("Ура, засунули в partition {} на offset {}", 
                   metadata.partition(), metadata.offset()); // А тут — что повезло
    }
});
// А сам код уже дальше побежал, не дожидаясь. Лихая удаль, блядь!

// Вариант «я параноик и буду стоять тут, пока не скажут "ок"» (синхронный, блокирующий)
try {
    RecordMetadata metadata = producer.send(record).get(); // ТУТ ПРОСТОЙ, СТОИМ КАК ВОРОНЫ
    System.out.println("Фух, отправили, offset: " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace(); // И если что-то пошло не так — сиди и плачь в блоке try-catch
}

2. Consumer (тот, кто жрёт сообщения): Тут вроде как неблокирующе, но с подвохом, сука.

// Спрашиваем: «Ну что, есть чё?» — но долго не ждём.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 100 мс — и хватит
if (records.isEmpty()) {
    // Пусто, блядь! Ну и похуй, пошли дальше.
    return;
}
// А если не пусто — ну тогда работаем, пашем.
for (ConsumerRecord<String, String> record : records) {
    processMessage(record.value()); // Перевариваем, что прислали
}

3. Вся эта ваша модная реактивщина (Spring Kafka): Тут уже вообще магия, ёпта. Всё течёт, всё реактивно, мозг вытекает.

@Bean
public ReactiveKafkaConsumerTemplate<String, String> reactiveConsumer() {
    return new ReactiveKafkaConsumerTemplate<>(receiverOptions); // Бин, блядь, реактивный
}

// Подписываемся на поток сообщений, как на стрим котиков
reactiveConsumer.receive()
    .doOnNext(record -> {
        System.out.println("О, приплыло: " + record.value()); // Обработали
        record.receiverOffset().acknowledge(); // Кивнули, что взяли
    })
    .subscribe(); // И пошло-поехало, без блокировок

Итог, блядь, рекомендации:

  • Хочешь быстро — шли асинхронно и не парься, но будь готов, что где-то что-то потеряется, ёпта.
  • Настрой эти свои acks, retries — балансируй между «надёжно» и «летает как угорелое».
  • У потребителя не ставь огромные таймауты в poll(), а то будешь как лох торчать в ожидании.
  • Если проект сложный, как чёртова машина Руба Голдберга — смотри в сторону реактивных штук (Kafka Streams, Spring Reactor Kafka). Там своя атмосфера, блядь, но зато масштабируется на овердохуища.