Ответ
По умолчанию Kafka работает неблокирующе, но поведение можно настроить в зависимости от требований.
1. Producer (отправка сообщений):
// АСИНХРОННАЯ отправка (не блокирует поток)
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
producer.send(record, (metadata, exception) -> {
// Callback выполнится при получении подтверждения от брокера
if (exception != null) {
logger.error("Ошибка отправки", exception);
} else {
logger.info("Отправлено в partition {} offset {}",
metadata.partition(), metadata.offset());
}
});
// Продолжение выполнения кода без ожидания
// СИНХРОННАЯ отправка (блокирует поток до подтверждения)
try {
RecordMetadata metadata = producer.send(record).get(); // БЛОКИРУЮЩИЙ ВЫЗОВ
System.out.println("Отправлено: " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
2. Consumer (получение сообщений):
// НЕБЛОКИРУЮЩИЙ poll() с таймаутом
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
// Нет сообщений - продолжаем работу
return;
}
for (ConsumerRecord<String, String> record : records) {
processMessage(record.value());
}
3. Современные реактивные подходы (Spring Kafka):
@Bean
public ReactiveKafkaConsumerTemplate<String, String> reactiveConsumer() {
return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
}
// Реактивная обработка без блокировки
reactiveConsumer.receive()
.doOnNext(record -> {
System.out.println("Получено: " + record.value());
record.receiverOffset().acknowledge();
})
.subscribe();
Рекомендации:
- Используйте асинхронную отправку с callback для максимальной производительности
- Настройте
acks,retries,max.in.flight.requests.per.connectionдля баланса между надежностью и скоростью - Для потребителей используйте максимально короткие таймауты в
poll() - Рассмотрите реактивные реализации (Kafka Streams, Spring Reactor Kafka) для сложных пайплайнов обработки
Ответ 18+ 🔞
А, ну вот, классика жанра, блядь! Смотри, как оно на самом деле работает, а то все эти умные слова про «неблокирующе» — нихуя не понятно.
1. Producer (тот, кто шлёт сообщения): Тут, сука, два пути, как в жизни: либо быстро и без гарантий, либо ждать, пока всё подтвердят, как идиот.
// Вариант «похуй, отправь и забей» (асинхронный, не блокирует)
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
producer.send(record, (metadata, exception) -> {
// А вот эта штука дёрнется потом, когда брокер ответит. Может, через час, ёпта.
if (exception != null) {
logger.error("Ой, всё, не улетело!", exception); // Вот тут и узнаешь, что всё просрал
} else {
logger.info("Ура, засунули в partition {} на offset {}",
metadata.partition(), metadata.offset()); // А тут — что повезло
}
});
// А сам код уже дальше побежал, не дожидаясь. Лихая удаль, блядь!
// Вариант «я параноик и буду стоять тут, пока не скажут "ок"» (синхронный, блокирующий)
try {
RecordMetadata metadata = producer.send(record).get(); // ТУТ ПРОСТОЙ, СТОИМ КАК ВОРОНЫ
System.out.println("Фух, отправили, offset: " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); // И если что-то пошло не так — сиди и плачь в блоке try-catch
}
2. Consumer (тот, кто жрёт сообщения): Тут вроде как неблокирующе, но с подвохом, сука.
// Спрашиваем: «Ну что, есть чё?» — но долго не ждём.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 100 мс — и хватит
if (records.isEmpty()) {
// Пусто, блядь! Ну и похуй, пошли дальше.
return;
}
// А если не пусто — ну тогда работаем, пашем.
for (ConsumerRecord<String, String> record : records) {
processMessage(record.value()); // Перевариваем, что прислали
}
3. Вся эта ваша модная реактивщина (Spring Kafka): Тут уже вообще магия, ёпта. Всё течёт, всё реактивно, мозг вытекает.
@Bean
public ReactiveKafkaConsumerTemplate<String, String> reactiveConsumer() {
return new ReactiveKafkaConsumerTemplate<>(receiverOptions); // Бин, блядь, реактивный
}
// Подписываемся на поток сообщений, как на стрим котиков
reactiveConsumer.receive()
.doOnNext(record -> {
System.out.println("О, приплыло: " + record.value()); // Обработали
record.receiverOffset().acknowledge(); // Кивнули, что взяли
})
.subscribe(); // И пошло-поехало, без блокировок
Итог, блядь, рекомендации:
- Хочешь быстро — шли асинхронно и не парься, но будь готов, что где-то что-то потеряется, ёпта.
- Настрой эти свои
acks,retries— балансируй между «надёжно» и «летает как угорелое». - У потребителя не ставь огромные таймауты в
poll(), а то будешь как лох торчать в ожидании. - Если проект сложный, как чёртова машина Руба Голдберга — смотри в сторону реактивных штук (Kafka Streams, Spring Reactor Kafka). Там своя атмосфера, блядь, но зато масштабируется на овердохуища.