Ответ
Prefetch (предварительная выборка) — это механизм контроля потока (flow control) на стороне потребителя (consumer) в RabbitMQ. Он определяет максимальное количество неподтверждённых сообщений (unacknowledged), которые брокер может отправить потребителю за раз, прежде чем остановить отправку и ждать подтверждений (acknowledgements).
Зачем это нужно? Без prefetch брокер будет "выталкивать" (push) все доступные сообщения потребителю сразу, что может привести к:
- Перегрузке потребителя: Потребитель не успевает обрабатывать сообщения, что ведёт к росту потребления памяти и возможным сбоям.
- Неравномерному распределению нагрузки: В системе с несколькими потребителями быстрый потребитель может остаться без работы, пока медленный разгребает свою огромную очередь неподтверждённых сообщений.
Как работает:
Устанавливается prefetchCount (например, 1). Брокер отправляет потребителю одно сообщение и ждет его подтверждения (BasicAck). Только после получения подтверждения отправляется следующее сообщение. Это создаёт модель "round-robin" (циклического перебора) между потребителями.
Настройка в .NET (библиотека RabbitMQ.Client):
using RabbitMQ.Client;
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// КРИТИЧНО ВАЖНЫЙ ПАРАМЕТР: autoAck: false
// Prefetch работает только с ручным подтверждением.
var consumer = new EventingBasicConsumer(channel);
// Настройка prefetch. BasicQos = Quality of Service.
// prefetchSize: 0 (не используется, лимит по размеру сообщений в байтах).
// prefetchCount: 1 (макс. количество неподтверждённых сообщений).
// global: false (лимит применяется к каждому новому потребителю на этом канале).
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
consumer.Received += (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received: {message}");
// Имитация обработки
Thread.Sleep(1000);
// Ручное подтверждение успешной обработки
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
// Сообщение не обработано. Отправляем в Dead Letter Exchange или отрицательно подтверждаем.
channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
}
};
channel.BasicConsume(queue: "task_queue",
autoAck: false, // Автоподтверждение должно быть ВЫКЛЮЧЕНО
consumer: consumer);
Ключевые параметры BasicQos:
prefetchCount: Самый важный параметр. Значение1обеспечивает честное распределение, но может снизить общую пропускную способность, если обработка быстрая. Для увеличения throughput можно установить большее значение (например, 50-100), но это требует мониторинга, чтобы не перегрузить потребителя.global:false(по умолчанию): Настройка применяется к каждому потребителю на канале индивидуально. Если на канале 2 потребителя сprefetchCount=1, каждый может иметь по 1 неподтверждённому сообщению.true: Настройка применяется ко всему каналу в целом. Для 2 потребителей сprefetchCount=1они в сумме смогут получить только 1 неподтверждённое сообщение.
Выбор значения prefetchCount: Зависит от характера задачи. Для долгих задач (обработка изображений, PDF) лучше оставить 1. Для коротких задач (запись в базу, вызов другого API) можно увеличить значение, чтобы избежать простоев потребителя в ожидании новых сообщений.
Ответ 18+ 🔞
А, слушай, про prefetch в RabbitMQ — это ж классика, блядь! Представь себе: ты в баре, а тебе официант сразу 50 кружек пива на стол ставит. И сидишь ты, обоссаный, смотришь на эту хуйню и думаешь: «Ну я ж одну выпью, остальные теплые будут, а он новые таскать уже не может, потому что стол забит!». Вот prefetch — это как сказать официанту: «Мужик, неси по одной, пока предыдущую не допью».
Зачем это вообще нужно, нахуй? Без этого твой потребитель превращается в того мудака на свадьбе, который накладывает себе полтора килограмма оливье. Потом сидит, давится, а все остальные голодные. Конкретнее:
- Потребитель захлебнётся. Брокер ему все сообщения из очереди в пасть вывалит, а обрабатывать он их не успевает. Память сожрёт, упадёт, и всем пиздец.
- Распределение будет пиздец как кривое. Один потребитель медленный, как черепаха в смоле, и у него 100500 непрочитанных сообщений висит, а другой — шустрый, но ему нихуя не прилетает, потому что очередь-то уже «занята» первым лодырем.
Как оно крутится-вертится:
Ставишь prefetchCount, скажем, в единичку. Брокер такой: «На, братан, одно сообщение, разберись с ним». И ждёт, пока ты не скажешь «ок, готово» (это BasicAck). Только после этого шлёт следующее. Получается честная очередь, как в советском магазине — всем по одному, без давки.
Вот как это в .NET выглядит, на реальных примерах:
using RabbitMQ.Client;
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// ВАЖНО, БЛЯДЬ! autoAck: false — иначе вся эта магия не работает.
// С автоподтверждением prefetch — это как презерватив с дыркой.
var consumer = new EventingBasicConsumer(channel);
// Вот эта строчка — волшебная палочка.
// BasicQos — это типа «настрой качество обслуживания, а не гони пургу».
// prefetchSize: 0 — нам по размеру сообщений не ограничивать, похуй.
// prefetchCount: 1 — вот сердцевина, брат. Одно сообщение в работу.
// global: false — лимит на каждого потребителя отдельно, а не на всех скопом.
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
consumer.Received += (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Получил: {message}");
// Представь, что тут какая-то тяжёлая хуйня, типа генерации отчёта
Thread.Sleep(1000);
// Говорим брокеру: «Всё, брат, я справился, давай следующее!»
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
// Если всё пошло по пизде — сообщаем, что не осилили.
// requeue: true — кидаем сообщение обратно в очередь, пусть кто-то другой попробует.
channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
}
};
channel.BasicConsume(queue: "task_queue",
autoAck: false, // ЕЩЁ РАЗ, БЛЯДЬ, FALSE! АТЕНШН!
consumer: consumer);
Про параметры BasicQos — чтоб не облажаться:
prefetchCount: Главный рычаг. Единичка — это надёжно и честно, но если твои задачи обрабатываются за миллисекунды, то потребитель будет простаивать, как лох. Тогда можно поднять до 50 или 100, чтобы он всегда был при деле. Но смотри, не переборщи — а то опять захлебнётся, ёпта.global: Тут важно не проебаться.false(обычно так и ставят): Лимит на каждого потребителя персонально. Два потребителя на канале, у каждогоprefetchCount=1— значит, у каждого может висеть по одному неподтверждённому сообщению.true: Лимит на весь канал общий. Те же два потребителя сprefetchCount=1— и они ВМЕСТЕ могут получить только одно сообщение. Один будет работать, второй — в пролёте. Чаще всего это не то, что нужно.
Какой prefetchCount выбрать? Да по обстоятельствам, чувак!
Если задачи долгие (типа видео конвертить или нейросеть тренировать) — оставляй 1.
Если задачи быстрые (запись строчки в БД или вызов простого API) — смело поднимай, хоть до 100, чтобы канал не простаивал впустую. Главное — смотри по мониторингу, чтобы память не улетала в космос.