Как реализовать batch-обработку сообщений в RabbitMQ?

«Как реализовать batch-обработку сообщений в RabbitMQ?» — вопрос из категории Брокеры сообщений, который задают на 25% собеседований C# Разработчик. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Batch-обработка в RabbitMQ позволяет эффективно подтверждать и обрабатывать группы сообщений, снижая нагрузку на сеть и брокер. Основной механизм — ручное подтверждение (manual acknowledgement) с накоплением delivery tags и подтверждением пачкой.

Ключевые шаги:

  1. Отключить авто-подтверждение (autoAck: false).
  2. Настроить BasicQos для контроля prefetch count.
  3. Накопить batch сообщений в памяти.
  4. Обработать batch и подтвердить все сообщения одним вызовом BasicAck с флагом multiple: true.

Пример на C# (RabbitMQ.Client):

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// 1. Настройка prefetch. Важно: prefetch >= размера batch.
channel.BasicQos(prefetchSize: 0, prefetchCount: 100, global: false);

// 2. Создание consumer с ручным подтверждением.
var consumer = new EventingBasicConsumer(channel);
var pendingDeliveryTags = new List<ulong>();
const int batchSize = 50;

consumer.Received += (model, ea) => {
    pendingDeliveryTags.Add(ea.DeliveryTag);
    // 3. Накопление batch.
    if (pendingDeliveryTags.Count >= batchSize) {
        ProcessMessageBatch(ea.Body.ToArray()); // Ваша логика обработки
        // 4. Подтверждение пачки.
        var lastTag = pendingDeliveryTags.Last();
        channel.BasicAck(deliveryTag: lastTag, multiple: true);
        pendingDeliveryTags.Clear();
    }
};

channel.BasicConsume(queue: "myQueue", autoAck: false, consumer: consumer);

Практические нюансы:

  • Надёжность: При падении потребителя все неподтверждённые сообщения (включая накопленные в batch) будут повторно доставлены другим потребителям.
  • Таймаут batch: Добавьте таймер для подтверждения неполного batch по истечении времени, чтобы избежать длительных задержек.
  • Память: Управляйте размером batch, чтобы не исчерпать память.
  • Качество обслуживания (QoS): BasicQos с global: false применяется к каждому новому потребителю отдельно.