Что такое Prefetch (предварительная выборка) в RabbitMQ?

«Что такое Prefetch (предварительная выборка) в RabbitMQ?» — вопрос из категории Брокеры сообщений, который задают на 25% собеседований C# Разработчик. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Prefetch (предварительная выборка) — это механизм контроля потока (flow control) на стороне потребителя (consumer) в RabbitMQ. Он определяет максимальное количество неподтверждённых сообщений (unacknowledged), которые брокер может отправить потребителю за раз, прежде чем остановить отправку и ждать подтверждений (acknowledgements).

Зачем это нужно? Без prefetch брокер будет "выталкивать" (push) все доступные сообщения потребителю сразу, что может привести к:

  1. Перегрузке потребителя: Потребитель не успевает обрабатывать сообщения, что ведёт к росту потребления памяти и возможным сбоям.
  2. Неравномерному распределению нагрузки: В системе с несколькими потребителями быстрый потребитель может остаться без работы, пока медленный разгребает свою огромную очередь неподтверждённых сообщений.

Как работает: Устанавливается prefetchCount (например, 1). Брокер отправляет потребителю одно сообщение и ждет его подтверждения (BasicAck). Только после получения подтверждения отправляется следующее сообщение. Это создаёт модель "round-robin" (циклического перебора) между потребителями.

Настройка в .NET (библиотека RabbitMQ.Client):

using RabbitMQ.Client;

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// КРИТИЧНО ВАЖНЫЙ ПАРАМЕТР: autoAck: false
// Prefetch работает только с ручным подтверждением.
var consumer = new EventingBasicConsumer(channel);

// Настройка prefetch. BasicQos = Quality of Service.
// prefetchSize: 0 (не используется, лимит по размеру сообщений в байтах).
// prefetchCount: 1 (макс. количество неподтверждённых сообщений).
// global: false (лимит применяется к каждому новому потребителю на этом канале).
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

consumer.Received += (model, ea) =>
{
    try
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine($" [x] Received: {message}");
        // Имитация обработки
        Thread.Sleep(1000); 
        // Ручное подтверждение успешной обработки
        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    }
    catch (Exception ex)
    {
        // Сообщение не обработано. Отправляем в Dead Letter Exchange или отрицательно подтверждаем.
        channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
    }
};

channel.BasicConsume(queue: "task_queue",
                     autoAck: false, // Автоподтверждение должно быть ВЫКЛЮЧЕНО
                     consumer: consumer);

Ключевые параметры BasicQos:

  • prefetchCount: Самый важный параметр. Значение 1 обеспечивает честное распределение, но может снизить общую пропускную способность, если обработка быстрая. Для увеличения throughput можно установить большее значение (например, 50-100), но это требует мониторинга, чтобы не перегрузить потребителя.
  • global:
    • false (по умолчанию): Настройка применяется к каждому потребителю на канале индивидуально. Если на канале 2 потребителя с prefetchCount=1, каждый может иметь по 1 неподтверждённому сообщению.
    • true: Настройка применяется ко всему каналу в целом. Для 2 потребителей с prefetchCount=1 они в сумме смогут получить только 1 неподтверждённое сообщение.

Выбор значения prefetchCount: Зависит от характера задачи. Для долгих задач (обработка изображений, PDF) лучше оставить 1. Для коротких задач (запись в базу, вызов другого API) можно увеличить значение, чтобы избежать простоев потребителя в ожидании новых сообщений.