Ответ
Маршрутизация сообщений в RabbitMQ определяется взаимодействием трех сущностей: издатель (Publisher), обменник (Exchange) и очереди (Queues). Издатель никогда не отправляет сообщения напрямую в очередь, а всегда через обменник, используя ключ маршрутизации (Routing Key).
Ключевые компоненты:
- Exchange (Обменник): Точка входа для сообщений. Получает сообщение от издателя и решает, в какие очереди его направить, основываясь на своем типе и ключе маршрутизации.
- Binding (Привязка): "Правило", которое связывает обменник с очередью. Для некоторых типов обменников включает ключ привязки (Binding Key), который сравнивается с Routing Key.
- Routing Key (Ключ маршрутизации): Строка, которую издатель прикрепляет к сообщению. Это "адрес" или "метка" для маршрутизации.
- Queue (Очередь): Хранилище сообщений, откуда их забирают потребители (Consumers).
Типы обменников и логика маршрутизации:
Тип Exchange (ExchangeType) |
Логика маршрутизации | Типичное использование |
|---|---|---|
Direct |
Сообщение идет в очереди, чей Binding Key ТОЧНО совпадает с Routing Key. | Точечная отправка, RPC, распределение задач по типам. |
Fanout |
Игнорирует Routing Key. Копия сообщения отправляется во все привязанные очереди. | Широковещательные уведомления, событийная рассылка (например, "пользователь зарегистрировался"). |
Topic |
Использует шаблоны в Binding Key (* — одно слово, # — ноль или несколько слов). Сообщение попадает в очереди, чей шаблон совпадает с Routing Key. |
Сложная маршрутизация событий по категориям (например, logs.app.error, orders.eu.paid). |
Headers |
Игнорирует Routing Key. Маршрутизация происходит на основе заголовков (headers) сообщения, где указываются пары ключ-значение. Привязка задает условия сопоставления (x-match: all или any). |
Редко используется, альтернатива Topic, когда ключ маршрутизации неудобно выразить строкой. |
Практические примеры кода (с использованием библиотеки RabbitMQ.Client):
1. Настройка издателя (Publisher) для Direct Exchange:
using var channel = connection.CreateModel();
// Объявляем Direct Exchange
channel.ExchangeDeclare(exchange: "order.direct", type: ExchangeType.Direct, durable: true);
// Сообщение для обработки заказов из ЕС
var message = "Order data...";
var body = Encoding.UTF8.GetBytes(message);
var routingKey = "orders.eu"; // Ключ маршрутизации
// Публикуем сообщение в обменник с указанным routingKey
channel.BasicPublish(exchange: "order.direct",
routingKey: routingKey, // Сообщение пойдет в очередь, привязанную с ключом "orders.eu"
basicProperties: null,
body: body);
2. Настройка потребителя (Consumer) и привязка очереди к Topic Exchange:
using var channel = connection.CreateModel();
// Объявляем Topic Exchange для системы логов
channel.ExchangeDeclare(exchange: "logs.topic", type: ExchangeType.Topic);
// Объявляем временную (exclusive) очередь для этого сервиса
var queueName = channel.QueueDeclare().QueueName;
// Привязываем очередь к обменнику с шаблоном Binding Key.
// Эта очередь получит все логи ошибок (`error`) от любого приложения (`*`).
channel.QueueBind(queue: queueName,
exchange: "logs.topic",
routingKey: "*.error"); // Шаблон Binding Key
// Начинаем потреблять сообщения
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// В ea.RoutingKey будет, например, "payment.error"
Console.WriteLine($"[ERROR from {ea.RoutingKey}] {message}");
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Рекомендации по проектированию:
- Именуйте обменники и ключи осмысленно:
notifications.fanout,commands.direct,events.topic. - Используйте
Topicдля гибкой маршрутизации событий. Это самый мощный и часто используемый тип. - Помните о
Fanoutдля широковещания. Он эффективен, когда нужно отправить одно событие многим независимым подписчикам. - Проверяйте существование обменников и очередей перед использованием или используйте флаг
durable: trueдля их сохранения между перезапусками брокера.
Ответ 18+ 🔞
Давай разберём эту вашу маршрутизацию в RabbitMQ, а то как её обычно объясняют — глаза на лоб лезут. Представь себе почтовое отделение, только для данных, и все там слегка ебнутые.
Вот смотри, есть три главных героя в этой пьесе: Издатель (Publisher), Обменник (Exchange) и Очередь (Queue). Так вот, издатель — он вообще палец о палец не ударит, чтобы напрямую в очередь что-то сунуть. Нет, блядь. Он как барский сынок: принёс посылку (сообщение), швырнул её в окошко обменника и сказал: «Вот тебе ключ маршрутизации (Routing Key), разбирайся, куда это, я пошёл». И всё. А обменник уже, бедолага, с этой хуйней разбирается.
Обменник (Exchange) — это такой диспетчер-шизофреник. У него несколько личностей, в зависимости от типа. Он получает сообщение, смотрит на ключ и решает, в какие очереди его пихнуть. А связывает его с очередью волшебная ниточка под названием Binding (Привязка), в которой иногда прописан ключ привязки (Binding Key). Вот их он и сравнивает с ключом маршрутизации.
Очередь (Queue) — это просто ящик. Тупо хранилище, где сообщения лежат и ждут, пока их заберёт потребитель (Consumer). Всё гениальное просто.
А теперь самое интересное — типы обменников. От этого зависит, как они эту всю хуйню маршрутизируют.
Типы обменников: кто во что горазд
| Тип Exchange | Как работает (логика) | Где применять (чтобы жизнь мёдом не казалась) |
|---|---|---|
Direct |
Сообщение идёт ТОЛЬКО в ту очередь, чей Binding Key ПОЛНОСТЬЮ совпал с Routing Key. Точно в цель. | Точечная отправка. Типа «заказ №666 — обработай его, сервис оплаты». Или задачи по типам: video.encode, image.resize. |
Fanout |
Вообще похуй на ключи. Берет сообщение и копирует его ВО ВСЕ привязанные очереди. Разосрался на всех. | Широковещание. «Всем! Всем! Всем! Пользователь Вася только что зарегистрировался!». Новости, уведомления, события, которые должны получить все подписчики. |
Topic |
Тут уже начинается магия. В Binding Key можно писать шаблоны с * (одно слово) и # (ноль или много слов). Сообщение попадёт во все очереди, чей шаблон подошёл к Routing Key. |
Сложная маршрутизация событий. Логи (logs.app.error, logs.payment.info), события в микросервисах (order.created, user.blocked.eu). Самый часто используемый и гибкий тип, ёпта. |
Headers |
Вообще игнорирует Routing Key. Смотрит на заголовки (headers) сообщения (пары ключ-значение). Привязка задаёт условия: «все заголовки должны совпасть» (x-match: all) или «хотя бы один» (x-match: any). |
Редкая зверюга. Используется, когда ключ строкой не выразить, а хочется маршрутизировать по каким-то атрибутам в заголовках. |
Примеры из жизни, то есть из кода
Вот смотри, как это выглядит, когда ты это пишешь (используем библиотеку RabbitMQ.Client).
1. Издатель шлёт заказ через Direct Exchange Представь, ты сервис заказов. Нужно отправить заказ из ЕС на обработку.
using var channel = connection.CreateModel();
// Объявляем Direct Exchange. Название "order.direct", тип — Direct, durable: true чтобы пережил перезагрузку.
channel.ExchangeDeclare(exchange: "order.direct", type: ExchangeType.Direct, durable: true);
// Само сообщение
var message = "Order data...";
var body = Encoding.UTF8.GetBytes(message);
// Ключ маршрутизации. Как адрес на конверте.
var routingKey = "orders.eu";
// Публикуем! Швыряем в обменник "order.direct" с ключом "orders.eu".
// Сообщение найдёт очередь, которая привязана к этому обменнику с ТОЧНО ТАКИМ ЖЕ Binding Key.
channel.BasicPublish(exchange: "order.direct",
routingKey: routingKey,
basicProperties: null,
body: body);
2. Потребитель слушает логи ошибок через Topic Exchange А вот сервис для сбора логов. Хочет получать только ошибки от любых приложений.
using var channel = connection.CreateModel();
// Объявляем Topic Exchange для логов.
channel.ExchangeDeclare(exchange: "logs.topic", type: ExchangeType.Topic);
// Создаём временную, уникальную очередь для этого экземпляра сервиса.
var queueName = channel.QueueDeclare().QueueName;
// ВЯЖЕМ! Это самое главное. Привязываем нашу очередь к обменнику "logs.topic"
// с Binding Key "*.error". Звёздочка (*) — любое одно слово, "error" — обязательно.
// Значит, очередь получит ВСЕ сообщения, где Routing Key: "payment.error", "auth.error", "gateway.error" и т.д.
channel.QueueBind(queue: queueName,
exchange: "logs.topic",
routingKey: "*.error");
// Вешаем потребителя
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// В ea.RoutingKey будет полный ключ, например "payment.error"
Console.WriteLine($"[ERROR from {ea.RoutingKey}] {message}");
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Краткие итоги, чтобы не облажаться
- Имена — всему голова. Называй обменники понятно:
notifications.fanout,commands.direct,events.user.topic. Неexchange1, блядь. Topic— твой лучший друг для событийной архитектуры. Гибко, мощно, можно делать сложные подписки.Fanout— это «на всех». Идеален, когда одно событие должно триггернуть кучу независимых действий у разных сервисов.- Не забывай про
durable. Если очередь или обменник должны пережить перезапуск брокера — ставьdurable: true. Иначе после ребута всё накроется медным тазом, и ты будешь искать, куда делись твои сообщения.