Ответ
Распределение сообщений из одной очереди между несколькими потребителями (консьюмерами) в RabbitMQ происходит по алгоритму round-robin. Каждое новое сообщение отправляется следующему доступному потребителю в порядке их подключения.
Управление распределением и надежностью:
1. Prefetch Count (basic.qos): Ключевой параметр для контроля нагрузки.
- Без настройки RabbitMQ будет "заталкивать" (push) сообщения потребителям как можно быстрее, что может привести к перегрузке одного из них.
- Установка
prefetchCount=1означает: "не отправляй потребителю новое сообщение, пока он не подтвердит обработку предыдущего". Это обеспечивает честное распределение.
// Пример на C# с библиотекой RabbitMQ.Client
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// КРИТИЧНО: Устанавливаем prefetch count
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// prefetchCount: 1 - не больше одного неподтвержденного сообщения на потребителя
// global: false - настройка применяется к каждому новому потребителю отдельно
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
// Обработка сообщения...
// ...
// Ручное подтверждение обработки
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
autoAck: false, // Важно: отключаем авто-подтверждение!
consumer: consumer);
2. Подтверждение (Acknowledgement):
autoAck=false(рекомендуется) требует ручного подтверждения (BasicAck) после успешной обработки. Если потребитель отвалится, неподтвержденное сообщение будет переотправлено другому.autoAck=trueсообщение удаляется из очереди сразу после отправки потребителю, что рискованно при сбоях.
3. Альтернативные схемы распределения:
- Consistent Hash Exchange: Можно направить сообщения с одинаковым ключом (например,
userId) всегда к одному и тому же потребителю, используя плагинrabbitmq_consistent_hash_exchange. - Несколько очередей и Exchange: Создание сложных маршрутов через
direct,topicилиheadersexchange для управления потоком.
Ответ 18+ 🔞
Давай разберём эту тему, а то народ часто путается, как эта штука работает на самом деле. Смотри, представь себе конвейер на заводе и трёх работяг.
По умолчанию, RabbitMQ — это как тот самый прораб-распиздяй. Подходит к конвейеру, хватает ящик и суёт его первому свободному мужику. Тот взял — прораб уже второму суёт. Первый ещё палец о ящик ободрал, а ему уже следующий летит в руки. Это и есть round-robin, по кругу. Казалось бы, честно.
Но проблема в чём? А в том, что ящики-то бывают разные! Один — с пухом, а другой — с чугунными болванками. И вот одному работяге везёт, он пух разгружает, а другому — три болванки подряд прилетает, он еле живой, а прораб ему уже четвёртую пихает. Несправедливо, да? Перекос по нагрузке получается пиздецовый.
Вот чтобы такого не было, умные люди придумали Prefetch Count. Это как если бы каждый работяга сказал прорабу: «Мужик, не пихай мне новый ящик, пока я тебе от предыдущего чек не оторвал. Вот тебе бумажка (ack), значит я справился, готов к следующему». И прораб слушается.
В коде это выглядит так, главное не проебать настройки:
// Подключились, создали канал — это типа провели связь с прорабом
using var channel = connection.CreateModel();
// А вот это ВАЖНЕЙШАЯ КОМАНДА. Говорим прорабу:
// "Запомни нахуй: больше одного ящика в одни руки не давай, пока от меня чека не получишь".
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// prefetchCount=1 — это святое. global=false — правило для каждого нового работяги лично.
// Создаём потребителя (работягу)
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
// Тут работяга вскрывает ящик и делает свою работу...
// Допустим, отправляет письмо или чёт там считает.
// Долго может это делать, ебаться с ошибками.
// И ТОЛЬКО КОГДА ВСЁ ЗАЕБИСЬ, он рвёт чек (ack) и отдаёт прорабу.
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
// Всё, теперь можно кричать "Давай следующий!"
};
// Подписываем работягу на конвейер (очередь)
channel.BasicConsume(queue: "task_queue",
autoAck: false, // СМОТРИ СЮДА! Это ОТКЛЮЧЕНИЕ авто-чека.
consumer: consumer);
Вот эта связка prefetchCount=1 и autoAck=false — она и есть краеугольный камень нормальной, надёжной работы. Если autoAck поставить в true, то прораб будет считать, что ящик вручил — и сразу его нахуй выбрасывать. А если работяга в этот момент споткнётся и упадёт, ящик пропадёт навсегда. Сообщение потеряно, клиент нихуя не получил. Кошмар.
А если хочется не просто по кругу, а чтобы, например, все сообщения про одного и того же юзера шли к одному и тому же работяге (чтобы у того в кэше данные были), то тут round-robin не катит. Надо использовать Consistent Hash Exchange (это отдельный плагин). Или городить свои очереди через direct или topic exchange, чтобы сообщения с ключом user_15 всегда летели в очередь queue_for_worker_2. Но это уже для сложных сценариев, когда порядок или локальность данных критичны.
Короче, суть:
- Round-robin по умолчанию — просто, но может быть несправедливо.
- Prefetch=1 + ручные ack — чтобы не перегружать работяг и ничего не терять. Обязательная база.
- Другие схемы (hash, routing) — когда тебе важно не честно, а умно распределить, чтобы связанные сообщения попали к одному обработчику.
Вот и вся магия, без ебалы-переебалы. Главное — настройки не перепутать, а то будет тебе не распределённая система, а сплошной распиздяйствос.