Что такое Prefetch (предварительная выборка) в RabbitMQ?

Ответ

Prefetch (предварительная выборка) — это механизм контроля потока (flow control) на стороне потребителя (consumer) в RabbitMQ. Он определяет максимальное количество неподтверждённых сообщений (unacknowledged), которые брокер может отправить потребителю за раз, прежде чем остановить отправку и ждать подтверждений (acknowledgements).

Зачем это нужно? Без prefetch брокер будет "выталкивать" (push) все доступные сообщения потребителю сразу, что может привести к:

  1. Перегрузке потребителя: Потребитель не успевает обрабатывать сообщения, что ведёт к росту потребления памяти и возможным сбоям.
  2. Неравномерному распределению нагрузки: В системе с несколькими потребителями быстрый потребитель может остаться без работы, пока медленный разгребает свою огромную очередь неподтверждённых сообщений.

Как работает: Устанавливается prefetchCount (например, 1). Брокер отправляет потребителю одно сообщение и ждет его подтверждения (BasicAck). Только после получения подтверждения отправляется следующее сообщение. Это создаёт модель "round-robin" (циклического перебора) между потребителями.

Настройка в .NET (библиотека RabbitMQ.Client):

using RabbitMQ.Client;

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// КРИТИЧНО ВАЖНЫЙ ПАРАМЕТР: autoAck: false
// Prefetch работает только с ручным подтверждением.
var consumer = new EventingBasicConsumer(channel);

// Настройка prefetch. BasicQos = Quality of Service.
// prefetchSize: 0 (не используется, лимит по размеру сообщений в байтах).
// prefetchCount: 1 (макс. количество неподтверждённых сообщений).
// global: false (лимит применяется к каждому новому потребителю на этом канале).
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

consumer.Received += (model, ea) =>
{
    try
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine($" [x] Received: {message}");
        // Имитация обработки
        Thread.Sleep(1000); 
        // Ручное подтверждение успешной обработки
        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    }
    catch (Exception ex)
    {
        // Сообщение не обработано. Отправляем в Dead Letter Exchange или отрицательно подтверждаем.
        channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
    }
};

channel.BasicConsume(queue: "task_queue",
                     autoAck: false, // Автоподтверждение должно быть ВЫКЛЮЧЕНО
                     consumer: consumer);

Ключевые параметры BasicQos:

  • prefetchCount: Самый важный параметр. Значение 1 обеспечивает честное распределение, но может снизить общую пропускную способность, если обработка быстрая. Для увеличения throughput можно установить большее значение (например, 50-100), но это требует мониторинга, чтобы не перегрузить потребителя.
  • global:
    • false (по умолчанию): Настройка применяется к каждому потребителю на канале индивидуально. Если на канале 2 потребителя с prefetchCount=1, каждый может иметь по 1 неподтверждённому сообщению.
    • true: Настройка применяется ко всему каналу в целом. Для 2 потребителей с prefetchCount=1 они в сумме смогут получить только 1 неподтверждённое сообщение.

Выбор значения prefetchCount: Зависит от характера задачи. Для долгих задач (обработка изображений, PDF) лучше оставить 1. Для коротких задач (запись в базу, вызов другого API) можно увеличить значение, чтобы избежать простоев потребителя в ожидании новых сообщений.

Ответ 18+ 🔞

А, слушай, про prefetch в RabbitMQ — это ж классика, блядь! Представь себе: ты в баре, а тебе официант сразу 50 кружек пива на стол ставит. И сидишь ты, обоссаный, смотришь на эту хуйню и думаешь: «Ну я ж одну выпью, остальные теплые будут, а он новые таскать уже не может, потому что стол забит!». Вот prefetch — это как сказать официанту: «Мужик, неси по одной, пока предыдущую не допью».

Зачем это вообще нужно, нахуй? Без этого твой потребитель превращается в того мудака на свадьбе, который накладывает себе полтора килограмма оливье. Потом сидит, давится, а все остальные голодные. Конкретнее:

  1. Потребитель захлебнётся. Брокер ему все сообщения из очереди в пасть вывалит, а обрабатывать он их не успевает. Память сожрёт, упадёт, и всем пиздец.
  2. Распределение будет пиздец как кривое. Один потребитель медленный, как черепаха в смоле, и у него 100500 непрочитанных сообщений висит, а другой — шустрый, но ему нихуя не прилетает, потому что очередь-то уже «занята» первым лодырем.

Как оно крутится-вертится: Ставишь prefetchCount, скажем, в единичку. Брокер такой: «На, братан, одно сообщение, разберись с ним». И ждёт, пока ты не скажешь «ок, готово» (это BasicAck). Только после этого шлёт следующее. Получается честная очередь, как в советском магазине — всем по одному, без давки.

Вот как это в .NET выглядит, на реальных примерах:

using RabbitMQ.Client;

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// ВАЖНО, БЛЯДЬ! autoAck: false — иначе вся эта магия не работает.
// С автоподтверждением prefetch — это как презерватив с дыркой.
var consumer = new EventingBasicConsumer(channel);

// Вот эта строчка — волшебная палочка.
// BasicQos — это типа «настрой качество обслуживания, а не гони пургу».
// prefetchSize: 0 — нам по размеру сообщений не ограничивать, похуй.
// prefetchCount: 1 — вот сердцевина, брат. Одно сообщение в работу.
// global: false — лимит на каждого потребителя отдельно, а не на всех скопом.
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

consumer.Received += (model, ea) =>
{
    try
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine($" [x] Получил: {message}");
        // Представь, что тут какая-то тяжёлая хуйня, типа генерации отчёта
        Thread.Sleep(1000);
        // Говорим брокеру: «Всё, брат, я справился, давай следующее!»
        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    }
    catch (Exception ex)
    {
        // Если всё пошло по пизде — сообщаем, что не осилили.
        // requeue: true — кидаем сообщение обратно в очередь, пусть кто-то другой попробует.
        channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
    }
};

channel.BasicConsume(queue: "task_queue",
                     autoAck: false, // ЕЩЁ РАЗ, БЛЯДЬ, FALSE! АТЕНШН!
                     consumer: consumer);

Про параметры BasicQos — чтоб не облажаться:

  • prefetchCount: Главный рычаг. Единичка — это надёжно и честно, но если твои задачи обрабатываются за миллисекунды, то потребитель будет простаивать, как лох. Тогда можно поднять до 50 или 100, чтобы он всегда был при деле. Но смотри, не переборщи — а то опять захлебнётся, ёпта.
  • global: Тут важно не проебаться.
    • false (обычно так и ставят): Лимит на каждого потребителя персонально. Два потребителя на канале, у каждого prefetchCount=1 — значит, у каждого может висеть по одному неподтверждённому сообщению.
    • true: Лимит на весь канал общий. Те же два потребителя с prefetchCount=1 — и они ВМЕСТЕ могут получить только одно сообщение. Один будет работать, второй — в пролёте. Чаще всего это не то, что нужно.

Какой prefetchCount выбрать? Да по обстоятельствам, чувак! Если задачи долгие (типа видео конвертить или нейросеть тренировать) — оставляй 1. Если задачи быстрые (запись строчки в БД или вызов простого API) — смело поднимай, хоть до 100, чтобы канал не простаивал впустую. Главное — смотри по мониторингу, чтобы память не улетала в космос.