Ответ
Prefetch (предварительная выборка) — это механизм контроля потока (flow control) на стороне потребителя (consumer) в RabbitMQ. Он определяет максимальное количество неподтверждённых сообщений (unacknowledged), которые брокер может отправить потребителю за раз, прежде чем остановить отправку и ждать подтверждений (acknowledgements).
Зачем это нужно? Без prefetch брокер будет "выталкивать" (push) все доступные сообщения потребителю сразу, что может привести к:
- Перегрузке потребителя: Потребитель не успевает обрабатывать сообщения, что ведёт к росту потребления памяти и возможным сбоям.
- Неравномерному распределению нагрузки: В системе с несколькими потребителями быстрый потребитель может остаться без работы, пока медленный разгребает свою огромную очередь неподтверждённых сообщений.
Как работает:
Устанавливается prefetchCount (например, 1). Брокер отправляет потребителю одно сообщение и ждет его подтверждения (BasicAck). Только после получения подтверждения отправляется следующее сообщение. Это создаёт модель "round-robin" (циклического перебора) между потребителями.
Настройка в .NET (библиотека RabbitMQ.Client):
using RabbitMQ.Client;
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// КРИТИЧНО ВАЖНЫЙ ПАРАМЕТР: autoAck: false
// Prefetch работает только с ручным подтверждением.
var consumer = new EventingBasicConsumer(channel);
// Настройка prefetch. BasicQos = Quality of Service.
// prefetchSize: 0 (не используется, лимит по размеру сообщений в байтах).
// prefetchCount: 1 (макс. количество неподтверждённых сообщений).
// global: false (лимит применяется к каждому новому потребителю на этом канале).
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
consumer.Received += (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received: {message}");
// Имитация обработки
Thread.Sleep(1000);
// Ручное подтверждение успешной обработки
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
// Сообщение не обработано. Отправляем в Dead Letter Exchange или отрицательно подтверждаем.
channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
}
};
channel.BasicConsume(queue: "task_queue",
autoAck: false, // Автоподтверждение должно быть ВЫКЛЮЧЕНО
consumer: consumer);
Ключевые параметры BasicQos:
prefetchCount: Самый важный параметр. Значение1обеспечивает честное распределение, но может снизить общую пропускную способность, если обработка быстрая. Для увеличения throughput можно установить большее значение (например, 50-100), но это требует мониторинга, чтобы не перегрузить потребителя.global:false(по умолчанию): Настройка применяется к каждому потребителю на канале индивидуально. Если на канале 2 потребителя сprefetchCount=1, каждый может иметь по 1 неподтверждённому сообщению.true: Настройка применяется ко всему каналу в целом. Для 2 потребителей сprefetchCount=1они в сумме смогут получить только 1 неподтверждённое сообщение.
Выбор значения prefetchCount: Зависит от характера задачи. Для долгих задач (обработка изображений, PDF) лучше оставить 1. Для коротких задач (запись в базу, вызов другого API) можно увеличить значение, чтобы избежать простоев потребителя в ожидании новых сообщений.