Ответ
Распределение сообщений из одной очереди между несколькими потребителями (консьюмерами) в RabbitMQ происходит по алгоритму round-robin. Каждое новое сообщение отправляется следующему доступному потребителю в порядке их подключения.
Управление распределением и надежностью:
1. Prefetch Count (basic.qos): Ключевой параметр для контроля нагрузки.
- Без настройки RabbitMQ будет "заталкивать" (push) сообщения потребителям как можно быстрее, что может привести к перегрузке одного из них.
- Установка
prefetchCount=1означает: "не отправляй потребителю новое сообщение, пока он не подтвердит обработку предыдущего". Это обеспечивает честное распределение.
// Пример на C# с библиотекой RabbitMQ.Client
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// КРИТИЧНО: Устанавливаем prefetch count
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// prefetchCount: 1 - не больше одного неподтвержденного сообщения на потребителя
// global: false - настройка применяется к каждому новому потребителю отдельно
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
// Обработка сообщения...
// ...
// Ручное подтверждение обработки
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
autoAck: false, // Важно: отключаем авто-подтверждение!
consumer: consumer);
2. Подтверждение (Acknowledgement):
autoAck=false(рекомендуется) требует ручного подтверждения (BasicAck) после успешной обработки. Если потребитель отвалится, неподтвержденное сообщение будет переотправлено другому.autoAck=trueсообщение удаляется из очереди сразу после отправки потребителю, что рискованно при сбоях.
3. Альтернативные схемы распределения:
- Consistent Hash Exchange: Можно направить сообщения с одинаковым ключом (например,
userId) всегда к одному и тому же потребителю, используя плагинrabbitmq_consistent_hash_exchange. - Несколько очередей и Exchange: Создание сложных маршрутов через
direct,topicилиheadersexchange для управления потоком.