Как реализовать batch-обработку сообщений в RabbitMQ?

Ответ

Batch-обработка в RabbitMQ позволяет эффективно подтверждать и обрабатывать группы сообщений, снижая нагрузку на сеть и брокер. Основной механизм — ручное подтверждение (manual acknowledgement) с накоплением delivery tags и подтверждением пачкой.

Ключевые шаги:

  1. Отключить авто-подтверждение (autoAck: false).
  2. Настроить BasicQos для контроля prefetch count.
  3. Накопить batch сообщений в памяти.
  4. Обработать batch и подтвердить все сообщения одним вызовом BasicAck с флагом multiple: true.

Пример на C# (RabbitMQ.Client):

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// 1. Настройка prefetch. Важно: prefetch >= размера batch.
channel.BasicQos(prefetchSize: 0, prefetchCount: 100, global: false);

// 2. Создание consumer с ручным подтверждением.
var consumer = new EventingBasicConsumer(channel);
var pendingDeliveryTags = new List<ulong>();
const int batchSize = 50;

consumer.Received += (model, ea) => {
    pendingDeliveryTags.Add(ea.DeliveryTag);
    // 3. Накопление batch.
    if (pendingDeliveryTags.Count >= batchSize) {
        ProcessMessageBatch(ea.Body.ToArray()); // Ваша логика обработки
        // 4. Подтверждение пачки.
        var lastTag = pendingDeliveryTags.Last();
        channel.BasicAck(deliveryTag: lastTag, multiple: true);
        pendingDeliveryTags.Clear();
    }
};

channel.BasicConsume(queue: "myQueue", autoAck: false, consumer: consumer);

Практические нюансы:

  • Надёжность: При падении потребителя все неподтверждённые сообщения (включая накопленные в batch) будут повторно доставлены другим потребителям.
  • Таймаут batch: Добавьте таймер для подтверждения неполного batch по истечении времени, чтобы избежать длительных задержек.
  • Память: Управляйте размером batch, чтобы не исчерпать память.
  • Качество обслуживания (QoS): BasicQos с global: false применяется к каждому новому потребителю отдельно.

Ответ 18+ 🔞

Давай разберем эту тему про батчи в RabbitMQ, а то народ часто нихуя не понимает, как это работает, а потом удивляется, что сообщения теряются или память кончается.

Смотри, суть в чём: RabbitMQ по умолчанию — это как мелкий назойливый чиновник. Каждое сообщение пришло — надо ему отдельную бумажку (ack) на подпись принести. Получил сообщение — тут же бежишь подтверждать. Это дохуя накладных расходов, сеть грузит, брокер мучается.

А батчевая обработка — это ты приходишь к этому чиновнику с пачкой бумажек раз в полчаса, ставишь ему одну подпись в конце стопки и свободен. Эффективность — овердохуища.

Как это делается, по шагам:

  1. Выключаем эту долбаную автомагию. autoAck: false — это святое. Иначе какой нахуй batch? Сообщение пришло — и сразу считается обработанным, даже если ты его в жопу выкинул.

  2. Настраиваем "лимит доверия". BasicQos — это как сказать брокеру: "Слушай, давай мне не всё сразу, а вот столько штук, окей?". prefetchCount должен быть хотя бы с размер твоего батча, а лучше больше, чтобы пока один батч обрабатывается, уже копился следующий. Иначе будет просто пауза, а это некрасиво.

  3. Копим, сука, копим! Заводишь в памяти список (лист, массив, хуй пойми что) этих самых deliveryTag. Каждое новое сообщение — добавляешь тег в копилку.

  4. Ждём момента Х. Либо накопилось нужное количество (размер батча), либо прошло N секунд (таймаут), чтобы не ждать вечно последнего сообщения для полного пака.

  5. Делаем дела и хороним пачку. Обработал все сообщения в батче (или откатил, если ошибка) — одним махом вызываешь BasicAck с флагом multiple: true и последним тегом в пачке. Брокер понимает: "А, окей, значит всё до этого тега включительно подтверждено, можно забыть." И чистишь свой список.

Вот смотри, как это выглядит в коде, если убрать всю эту сухую документацию:

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// Говорим: "Не грузи меня, давай порциями по 100"
channel.BasicQos(prefetchSize: 0, prefetchCount: 100, global: false);

var consumer = new EventingBasicConsumer(channel);
var накопленныеТеги = new List<ulong>(); // Наш мешок для тегов
const int размерПачки = 50;
var таймер = new System.Timers.Timer(2000); // Таймаут 2 секунды на всякий пожарный
таймер.Elapsed += (s, e) => ПодтвердитьПачкуЕслиЧтоТо(); // Функцию свою сделаешь

consumer.Received += (model, ea) => {
    // Прилетело сообщение — кидаем его тег в мешок
    накопленныеТеги.Add(ea.DeliveryTag);

    // Твоя логика обработки самого сообщения (сохраняешь куда-то, преобразуешь)
    // ... 

    // Ага, набралось 50 штук! Пора.
    if (накопленныеТеги.Count >= размерПачки) {
        // 1. Обрабатываем ВСЮ пачку (например, пишем в БД одним запросом)
        ОбработатьВсюПачкуМощно();

        // 2. Подтверждаем ВСЁ, что накопили, одним движением
        var последнийТег = накопленныеТеги.Last();
        channel.BasicAck(deliveryTag: последнийТег, multiple: true);

        // 3. Выкидываем мешок, начинаем новый
        накопленныеТеги.Clear();
        таймер.Stop();
    }
    else if (!таймер.Enabled) {
        // Если это первое сообщение в новой пачке, запускаем таймер
        таймер.Start();
    }
};

channel.BasicConsume(queue: "myQueue", autoAck: false, consumer: consumer);

Важные моменты, про которые нельзя забывать, а то будет пиздец:

  • Надёжность: Если твой процесс сдохнет, не успев отправить ack, RabbitMQ честно перешлёт все неподтверждённые сообщения (всю эту накопленную пачку!) другому работяге. Это и хорошо, и хуёво. Хорошо — ничего не потеряется. Хуёво — если ты их уже частично обработал (например, 49 из 50 записались в БД), а потом они придут ещё раз... Нужна идемпотентность, чувак. Либо сохраняй результат по-мелочи, но подтверждай пачкой.
  • Таймаут — твой друг. Без него можно ждать последнее сообщение для полной пачки до второго пришествия. Сделай механизм, который пнет систему и скажет: "Всё, что есть, — обрабатывай и подтверждай".
  • Память. Не делай размер батча в миллион сообщений, если они по гигабайту каждое. Съешь всю оперативку и сдохнешь.
  • Prefetch — это не просто так. Если поставишь prefetchCount: 1, а батч на 50, то ты никогда не накопишь 50, потому что тебе больше одного сообщения за раз и не дадут. Выставь значение с запасом.

Вот и вся магия. Вместо тысячи мелких писем — одно уведомление на пачку. И все довольны. Главное — не проёбывайся с обработкой внутри батча и не забудь про таймауты.