Как устроена маршрутизация сообщений в RabbitMQ

Ответ

В RabbitMQ сообщения не отправляются напрямую в очереди. Вместо этого используется модель, основанная на обменниках (exchanges), которые отвечают за маршрутизацию сообщений в одну или несколько очередей.

Основные компоненты этой модели:

  • Producer (Издатель): Приложение, которое отправляет сообщения.
  • Exchange (Обменник): Получает сообщения от издателей и направляет их в очереди. Тип обменника определяет логику маршрутизации:
    • direct: Отправляет сообщение в очереди, чей ключ привязки (binding key) точно совпадает с ключом маршрутизации (routing key) сообщения.
    • topic: Маршрутизирует сообщения на основе шаблонов (* — одно слово, # — ноль или более слов).
    • fanout: Отправляет копию сообщения во все привязанные к нему очереди, игнорируя ключ маршрутизации.
    • headers: Использует заголовки сообщения для маршрутизации (вместо ключей).
  • Queue (Очередь): Буфер, который хранит сообщения до тех пор, пока они не будут обработаны потребителем.
  • Binding (Привязка): Правило, которое связывает обменник с очередью. Для direct и topic обменников привязка содержит ключ (binding key).
  • Consumer (Потребитель): Приложение, которое получает и обрабатывает сообщения из очереди.

Процесс выглядит так:

  1. Producer публикует сообщение в Exchange с указанием routing key.
  2. Exchange, в зависимости от своего типа и routing key, находит подходящие Bindings.
  3. Через эти Bindings сообщение попадает в связанные с ними Queues.
  4. Consumer забирает сообщение из Queue для обработки.

Пример: объявление очереди и отправка сообщения

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 1. Объявляем очередь. durable=True означает, что очередь переживет перезапуск брокера.
channel.queue_declare(queue='task_queue', durable=True)

# 2. Публикуем сообщение в обменник по умолчанию ('')
# routing_key указывает на имя очереди.
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body='Hello World!',
    # delivery_mode=2 делает сообщение персистентным (сохранится на диск)
    properties=pika.BasicProperties(delivery_mode=2) 
)

print(" [x] Sent 'Hello World!'")
connection.close()

Ответ 18+ 🔞

Ага, слушай, вот тут у нас, блядь, классическая путаница начинается! Все думают, что в RabbitMQ ты пишешь сообщение прямо в очередь, а там, сука, нихуя подобного! Там целая, блядь, система обменников, как в каком-нибудь ёбаном банке, только вместо денег — сообщения.

Короче, смотри, кто тут у нас главные действующие лица:

  • Producer (Издатель) — это ты, такой важный, который решил что-то всем сообщить. Нашёл, блядь, чем гордиться.
  • Exchange (Обменник) — это, типа, главный диспетчер на сортировочной станции. Он получает твоё послание и решает, куда его дальше пихать. А вот КАК он решает — это зависит от его типа, тут целая ёперная классификация:
    • direct: Самый тупой и честный. Сравнивает ключ в сообщении (routing key) с ключом привязки очереди (binding key). Совпало — пошёл нахуй в эту очередь. Не совпало — иди нахуй в другую. Всё просто, как три копейки.
    • topic: Уже похитрее, любит шаблоны. Там звёздочки (*) и решётки (#). Типа, отправь всё, что начинается на «заказ.» или «логи.ошибка.#». Хуй пойми с первого раза, но мощно.
    • fanout: Полный распиздяй. Получил сообщение — и разослал его копии ВО ВСЕ привязанные к нему очереди. Без разбора, налево и направо. Всем сестрам по серьгам, блядь.
    • headers: Самый заумный уёбок. Смотрит не на ключи, а на заголовки сообщения. Настроил правила — и пускай там само разбирается.
  • Queue (Очередь) — это уже, собственно, ящик, куда всё падает и лежит, пока какой-нибудь работяга не придёт и не заберёт.
  • Binding (Привязка) — это, типа, проводочек, который соединяет обменник с очередью. Для умных обменников (direct, topic) на этом проводочке ещё и бирочка висит — тот самый binding key.
  • Consumer (Потребитель) — это тот самый работяга, который приходит, забирает сообщение из ящика-очереди и делает с ним что надо. Или не делает. Похуй, главное — забрал.

А теперь, блядь, как это всё вместе двигается, в рот меня чих-пых:

  1. Ты, Издатель, такой красавчик, пишешь сообщение и суёшь его в Обменник. Говоришь: «На, мудила, разберись, вот тебе routing_key».
  2. Обменник, ебаный сортировщик, смотрит на свой тип, на ключ, на привязки и думает своей деревянной головой: «Ага... Так-так... Ага... Нахуй!». И пихает сообщение в одну или несколько очередей.
  3. Сообщение прилетает в Очередь и тупо там лежит, как говно в проруби.
  4. Приходит Потребитель, засовывает руку в очередь, хватает сообщение и несёт его обрабатывать. Всё, цикл завершён.

Ну и примерчик, чтобы вообще всё стало ясно, как божий день

Смотри, вот тебе кусок кода, как это обычно делают. Блок кода не трогаю, как велели, он и так норм.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 1. Объявляем очередь. durable=True означает, что очередь переживет перезапуск брокера.
channel.queue_declare(queue='task_queue', durable=True)

# 2. Публикуем сообщение в обменник по умолчанию ('')
# routing_key указывает на имя очереди.
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body='Hello World!',
    # delivery_mode=2 делает сообщение персистентным (сохранится на диск)
    properties=pika.BasicProperties(delivery_mode=2) 
)

print(" [x] Sent 'Hello World!'")
connection.close()

Видишь? Мы объявляем очередь task_queue и говорим, чтобы она была durable — то есть живучая, переживёт перезагрузку брокера. А потом шлём сообщение в обменник по умолчанию (это пустая строка ''), который на самом деле является обменником типа direct с именем, совпадающим с routing_key. По сути, мы говорим: «Эй, обменник с именем task_queue, прими это сообщение с ключом task_queue». А он такой: «О, ключ task_queue... Так, а у меня есть привязка на очередь task_queue с ключом task_queue! Опа, совпало! Лети, дружок, в эту очередь!». И всё, приехали.

Вот и вся, блядь, магия. Не так страшен чёрт, как его малюют. Главное — не путать, куда что шлёшь, а то получишь сообщение не там, где ждал.