Как устроена маршрутизация сообщений в RabbitMQ

Ответ

В RabbitMQ сообщения не отправляются напрямую в очереди. Вместо этого используется модель, основанная на обменниках (exchanges), которые отвечают за маршрутизацию сообщений в одну или несколько очередей.

Основные компоненты этой модели:

  • Producer (Издатель): Приложение, которое отправляет сообщения.
  • Exchange (Обменник): Получает сообщения от издателей и направляет их в очереди. Тип обменника определяет логику маршрутизации:
    • direct: Отправляет сообщение в очереди, чей ключ привязки (binding key) точно совпадает с ключом маршрутизации (routing key) сообщения.
    • topic: Маршрутизирует сообщения на основе шаблонов (* — одно слово, # — ноль или более слов).
    • fanout: Отправляет копию сообщения во все привязанные к нему очереди, игнорируя ключ маршрутизации.
    • headers: Использует заголовки сообщения для маршрутизации (вместо ключей).
  • Queue (Очередь): Буфер, который хранит сообщения до тех пор, пока они не будут обработаны потребителем.
  • Binding (Привязка): Правило, которое связывает обменник с очередью. Для direct и topic обменников привязка содержит ключ (binding key).
  • Consumer (Потребитель): Приложение, которое получает и обрабатывает сообщения из очереди.

Процесс выглядит так:

  1. Producer публикует сообщение в Exchange с указанием routing key.
  2. Exchange, в зависимости от своего типа и routing key, находит подходящие Bindings.
  3. Через эти Bindings сообщение попадает в связанные с ними Queues.
  4. Consumer забирает сообщение из Queue для обработки.

Пример: объявление очереди и отправка сообщения

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 1. Объявляем очередь. durable=True означает, что очередь переживет перезапуск брокера.
channel.queue_declare(queue='task_queue', durable=True)

# 2. Публикуем сообщение в обменник по умолчанию ('')
# routing_key указывает на имя очереди.
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body='Hello World!',
    # delivery_mode=2 делает сообщение персистентным (сохранится на диск)
    properties=pika.BasicProperties(delivery_mode=2) 
)

print(" [x] Sent 'Hello World!'")
connection.close()