Приведите пример использования RabbitMQ для асинхронной обработки задач.

Ответ

RabbitMQ — это популярный брокер сообщений, реализующий протокол AMQP. Он идеально подходит для асинхронной обработки задач, распределения нагрузки между сервисами и обеспечения надежной доставки сообщений в системах, где важна гарантия обработки.

Сценарий использования: Асинхронная отправка email-уведомлений

Представьте, что после регистрации пользователя или совершения заказа необходимо отправить email-уведомление. Прямая отправка email в процессе обработки HTTP-запроса может замедлить ответ пользователю и привести к потере уведомления при сбое почтового сервиса. RabbitMQ позволяет вынести эту задачу в фоновый процесс.

Почему RabbitMQ подходит для этой задачи:

  • Надежная доставка: Сообщения могут быть персистентными и подтверждаться (ACK), что гарантирует их обработку даже при сбоях.
  • Распределение нагрузки: Несколько воркеров (consumer'ов) могут обрабатывать сообщения из одной очереди параллельно.
  • Отказоустойчивость: Сообщения сохраняются в очереди, пока не будут успешно обработаны.
  • Гибкость маршрутизации: Поддержка различных типов обменов (exchanges) для сложной маршрутизации сообщений.
  • Асинхронность: Основной процесс не блокируется ожиданием выполнения фоновой задачи.

Пример кода (Python с библиотекой pika):

Для демонстрации Producer и Consumer обычно запускаются как отдельные процессы.

1. Producer (отправитель задачи)

import pika
import json
import time

def send_email_task(user_email: str, subject: str, body: str):
    connection = None
    try:
        # Устанавливаем соединение с RabbitMQ брокером
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()

        # Объявляем очередь, если она еще не существует
        # durable=True делает очередь персистентной (сохраняется при перезапуске брокера)
        channel.queue_declare(queue='email_queue', durable=True)

        message = {
            'user_email': user_email,
            'subject': subject,
            'body': body
        }

        # Отправляем сообщение в очередь
        # delivery_mode=2 делает сообщение персистентным (сохраняется при перезапуске брокера)
        channel.basic_publish(
            exchange='', # Используем обмен по умолчанию (direct exchange)
            routing_key='email_queue', # Сообщение будет отправлено в очередь 'email_queue'
            body=json.dumps(message).encode('utf-8'),
            properties=pika.BasicProperties(
                delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
            )
        )
        print(f" [x] Отправлено сообщение для {user_email}")
    except pika.exceptions.AMQPConnectionError as e:
        print(f"Ошибка подключения к RabbitMQ: {e}")
    finally:
        if connection and not connection.is_closed:
            connection.close()

# Пример использования:
# send_email_task("user1@example.com", "Добро пожаловать!", "Спасибо за регистрацию!")
# send_email_task("user2@example.com", "Ваш заказ принят", "Мы обрабатываем ваш заказ.")

2. Consumer (обработчик задачи)

import pika
import json
import time

def send_email_mock(email: str, subject: str, body: str):
    """
    Мок-функция для имитации отправки email.
    В реальном приложении здесь будет вызов почтового клиента.
    """
    print(f" [x] Имитация отправки email на {email}:")
    print(f"     Тема: {subject}")
    print(f"     Тело: {body}")
    time.sleep(2) # Имитация задержки отправки email
    print(f" [x] Email на {email} отправлен.")

def callback(ch, method, properties, body):
    """
    Функция, вызываемая при получении сообщения из очереди.
    """
    try:
        message = json.loads(body.decode('utf-8'))
        user_email = message.get('user_email')
        subject = message.get('subject')
        email_body = message.get('body')

        print(f" [x] Получено сообщение: {user_email}")
        send_email_mock(user_email, subject, email_body)
        # Подтверждаем, что сообщение успешно обработано
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except json.JSONDecodeError:
        print(f" [!] Ошибка декодирования JSON: {body}")
        # Можно отправить сообщение в очередь для "битых" сообщений (dead-letter queue)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
    except Exception as e:
        print(f" [!] Ошибка обработки сообщения: {e}")
        # Если произошла ошибка, можно вернуть сообщение в очередь (requeue=True)
        # или отправить в dead-letter queue
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

connection = None
try:
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # Объявляем ту же очередь, что и продюсер
    channel.queue_declare(queue='email_queue', durable=True)

    # Указываем, что consumer будет брать не более 1 сообщения за раз
    # Это предотвращает отправку новых сообщений consumer'у, пока он не подтвердит предыдущее
    channel.basic_qos(prefetch_count=1)

    print(' [*] Ожидание сообщений. Для выхода нажмите CTRL+C')
    # Начинаем потребление сообщений
    channel.basic_consume(queue='email_queue', on_message_callback=callback)
    channel.start_consuming()
except pika.exceptions.AMQPConnectionError as e:
    print(f"Ошибка подключения к RabbitMQ: {e}")
except KeyboardInterrupt:
    print(" [*] Consumer остановлен.")
finally:
    if connection and not connection.is_closed:
        connection.close()

В этом примере:

  • Producer отправляет задачи на отправку email в очередь email_queue.
  • Consumer (или несколько consumer'ов) асинхронно извлекает эти задачи из очереди и выполняет отправку email.
  • Использование durable=True для очереди и delivery_mode=2 для сообщений обеспечивает их персистентность, что критически важно для надежности.
  • basic_ack подтверждает успешную обработку, а basic_qos помогает распределить нагрузку между несколькими consumer'ами.

Ответ 18+ 🔞

А, RabbitMQ, ёпта! Это ж как тот самый Герасим, только для сообщений, блядь. Немой, но здоровенный, и всё тащит, пока не упадёт. Представь: твоему сервису надо письма рассылать, а он как начнёт в почтовый сервис тыкаться — ну, как мудак с заиканием в колл-центр, блядь. Весь запнётся, пользователю ответить не может, а тут ещё почтовик лег — и всё, пиздец, письмо в никуда улетело, как Муму в озеро.

А с этим кроликом — совсем другая песня, в рот меня чих-пых! Ты ему просто кидаешь задачу: «На, блядь, отправь письмо этому чуваку». А сам сразу пользователю «окей, всё гуд» отвечаешь. А кролик этот, здоровый такой, стоит, жуёт свою морковку, и молча, блядь, тащит твоё сообщение в специальную ямку — в очередь, то есть. И стоит там, пока не приползёт какой-нибудь фоновый работяга (consumer, ёпта) и не заберёт его на отправку. И даже если работяга сдохнет на полпути — сообщение-то в ямке останется! Не потеряется, сука! Вот это надёжность, блядь, ядрёна вошь!

Смотри, как это выглядит в коде, только не засыпай, а то хуй потом разберёшь.

1. Отправитель (Producer) — тот, кто создаёт работу

Этот код просто выплёвывает задание в очередь и забывает. Как тот, кто заказал пиццу и пошёл смотреть сериал.

import pika
import json

def send_email_task(user_email: str, subject: str, body: str):
    connection = None
    try:
        # Подключаемся к кролику на локалхосте
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()

        # Объявляем очередь. durable=True — это чтобы она пережила перезагрузку кролика.
        # Типа не «Муму», а «Муму-навеки», блядь.
        channel.queue_declare(queue='email_queue', durable=True)

        message = {
            'user_email': user_email,
            'subject': subject,
            'body': body
        }

        # Выплёвываем сообщение в очередь
        channel.basic_publish(
            exchange='', # Используем стандартную дырку в заборе
            routing_key='email_queue', # Кидаем именно в эту очередь
            body=json.dumps(message).encode('utf-8'),
            properties=pika.BasicProperties(
                delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE # Чтобы сообщение тоже пережило перезагрузку
            )
        )
        print(f" [x] Запихнули в очередь письмо для {user_email}")
    except pika.exceptions.AMQPConnectionError as e:
        print(f"Ошибка подключения к RabbitMQ: {e}") # Кролик, сука, сдох
    finally:
        if connection and not connection.is_closed:
            connection.close()

2. Получатель (Consumer) — тот, кто реально пашет

А этот чувак сидит, уши развесил, и ждёт, когда в очереди появится работа. Как Герасим у проруби, только не топит, а отправляет.

import pika
import json
import time

def send_email_mock(email: str, subject: str, body: str):
    # Это заглушка, которая притворяется почтовым сервисом
    print(f" [x] Притворяемся, что шлём письмо на {email}:")
    print(f"     Тема: {subject}")
    time.sleep(2) # Имитируем, что всё идёт не быстро, блядь
    print(f" [x] Ну типа отправили на {email}.")

def callback(ch, method, properties, body):
    # Эта функция вызывается, когда в очереди что-то просыпалось
    try:
        message = json.loads(body.decode('utf-8'))
        user_email = message.get('user_email')
        subject = message.get('subject')
        email_body = message.get('body')

        print(f" [x] Вытащили из очереди работу для {user_email}")
        send_email_mock(user_email, subject, email_body)
        # Говорим кролику: «Всё, братан, я справился, сообщение можно нахуй удалять»
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except json.JSONDecodeError:
        print(f" [!] Что за хуйню мне прислали? JSON не читается!")
        # Говорим кролику: «Это говно, в очередь возвращать не надо, выкидывай»
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
    except Exception as e:
        print(f" [!] Во время отправки случилась дичь: {e}")
        # Говорим кролику: «Чувак, я обосрался, давай это сообщение обратно в очередь, пусть другой попробует»
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

connection = None
try:
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # Опять объявляем очередь, на всякий пожарный
    channel.queue_declare(queue='email_queue', durable=True)

    # Говорим: «Не пихай мне больше одного сообщения за раз, а то я, блядь, не справлюсь»
    channel.basic_qos(prefetch_count=1)

    print(' [*] Сижу, курю, жду писем для отправки. Вырубить — Ctrl+C')
    # Вешаем на себя ярлык «работяга для email_queue» и начинаем слушать
    channel.basic_consume(queue='email_queue', on_message_callback=callback)
    channel.start_consuming() # И уходим в вечное ожидание, как дурак
except pika.exceptions.AMQPConnectionError as e:
    print(f"Кролик куда-то пропал: {e}")
except KeyboardInterrupt:
    print(" [*] Всё, приехали, закругляемся.")
finally:
    if connection and not connection.is_closed:
        connection.close()

И вся магия, блядь, в чём? В том, что этих работяг (consumer'ов) можно запустить овердохуища! Один упал — другой подхватит. Очередь растёт — ты просто добавляешь новых работяг, и они всё разгребают. А основной сервис при этом не тупит, не ждёт, пользователям быстро отвечает. Красота, ёпта! Не то что синхронные вызовы, которые одним ебальником всё кладут.