Как отправить сообщение в очередь RabbitMQ с помощью библиотеки pika в Python?

Ответ

Отправка (publishing) сообщений в RabbitMQ на Python чаще всего выполняется с помощью библиотеки pika. Процесс состоит из установления соединения, создания канала и публикации сообщения в точку обмена (exchange).

Ключевые шаги:

  1. Установить соединение с RabbitMQ.
  2. Создать канал — легковесное соединение внутри основного.
  3. Объявить очередь (queue_declare), чтобы убедиться в её существовании. Если очередь не существует, она будет создана. Указание durable=True сохранит очередь при перезапуске брокера.
  4. Опубликовать сообщение (basic_publish) в точку обмена. Пустая строка exchange='' означает использование default exchange, который направляет сообщение в очередь, чьё имя совпадает с routing_key.

Пример кода (Publisher)

Использование with гарантирует корректное закрытие соединения даже при ошибках.

import pika

# Параметры подключения
connection_params = pika.ConnectionParameters('localhost')

# Устанавливаем соединение (контекстный менеджер закроет его автоматически)
try:
    with pika.BlockingConnection(connection_params) as connection:
        # Создаем канал
        channel = connection.channel()

        # Объявляем очередь. idempotent-операция.
        # durable=True делает очередь устойчивой к перезапуску RabbitMQ
        channel.queue_declare(queue='hello_queue', durable=True)

        message_body = 'Hello, RabbitMQ!'

        # Публикуем сообщение
        channel.basic_publish(
            exchange='',  # Default exchange
            routing_key='hello_queue', # Имя очереди
            body=message_body.encode('utf-8'), # Сообщение должно быть в байтах
            properties=pika.BasicProperties(
                delivery_mode=2,  # Сделать сообщение персистентным
            )
        )

        print(f" [x] Sent '{message_body}'")

except pika.exceptions.AMQPConnectionError as e:
    print(f"Failed to connect to RabbitMQ: {e}")

Важно:

  • Персистентность: Чтобы сообщения не терялись при перезапуске брокера, и очередь (durable=True), и сами сообщения (delivery_mode=2) должны быть помечены как персистентные.

Ответ 18+ 🔞

Ага, смотри, вот как эта штука работает, если по-человечьи. Представь, что RabbitMQ — это такой ебанутый почтовый ящик, только для программ. А твоя программа-отправитель — это ты, который пытается запихнуть в его узкую щель конверт с криком «НА, ПОЧТАЛЬОН, ДЕРЖИ!».

Короче, что нужно сделать, чтобы отправить сообщение:

  1. Подключиться к этому самому ящику. Ты же не будешь с улицы кидать? Нужно подойти, ключик найти.
  2. Открыть канал внутри соединения. Это как взять ту самую узкую щель (соединение) и просунуть в неё трубочку (канал), чтобы удобнее было пихать.
  3. Проверить/создать очередь. Ты кричишь: «Эй, ящик с названием hello_queue есть?». Если нет — он волшебным образом появляется. Сказать durable=True — это как прикрутить ящик нахуй к стене, чтобы его хулиганы не унесли.
  4. Запихнуть сообщение в этот ящик через ту самую трубочку-канал. Важный момент — если пишешь exchange='', это значит «эй, почтальон-дефолтный, возьми это письмо и сунь его прямо в ящик с именем routing_key».

Вот как это выглядит в коде, если не выёбываться

Библиотека pika. Запоминай, пизда, не «пика», а «пайка». Хуй с ним.

import pika

# Говорим, где наш почтовый ящик-извращенец
connection_params = pika.ConnectionParameters('localhost')

# Пробуем подключиться. Оборачиваем в try/except, потому что связь — она, блядь, ненадёжная
try:
    # Используем `with` — это наш церемонный поклон. Закроет соединение даже если нас тут током шарахнет.
    with pika.BlockingConnection(connection_params) as connection:
        # Открываем ту самую трубочку-канал в щель
        channel = connection.channel()

        # Орущем шёпотом: "Ящик 'hello_queue' есть?!"
        # durable=True — это наш крик "ДА ПРИВИНТИ ЕГО НАХУЙ, ЧТОБ НЕ УПАЛ!"
        channel.queue_declare(queue='hello_queue', durable=True)

        message_body = 'Hello, RabbitMQ!'

        # НАКОНЕЦ-ТО ПИХАЕМ!
        channel.basic_publish(
            exchange='',  # Говорим дефолтному почтальону
            routing_key='hello_queue', # Суй, мол, в ящик с такой вот биркой
            body=message_body.encode('utf-8'), # Сообщение должно быть в байтах, иначе почтальон обосрётся
            properties=pika.BasicProperties(
                delivery_mode=2,  # Это магическая цифра, которая значит: «Письмо несмываемыми чернилами, ёпта!»
            )
        )

        print(f" [x] Отправил '{message_body}'")

except pika.exceptions.AMQPConnectionError as e:
    # А вот если не получилось подключиться — ну, бывает, почта сегодня не работает.
    print(f"Не удалось достучаться до RabbitMQ: {e}")

Важный момент, про который все забывают, а потом охуевают: Если хочешь, чтобы после перезагрузки почтового отделения твои письма не испарились в пизду, нужно сделать ДВЕ ВЕЩИ:

  1. Очередь (queue) должна быть durable=True (привинчена).
  2. Само сообщение (message) должно иметь свойство delivery_mode=2 (несмываемые чернила).

Сделал только одну? Поздравляю, ты еблан. Сообщение потеряется.