Ответ
RabbitMQ — это популярный брокер сообщений, реализующий протокол AMQP. Он идеально подходит для асинхронной обработки задач, распределения нагрузки между сервисами и обеспечения надежной доставки сообщений в системах, где важна гарантия обработки.
Сценарий использования: Асинхронная отправка email-уведомлений
Представьте, что после регистрации пользователя или совершения заказа необходимо отправить email-уведомление. Прямая отправка email в процессе обработки HTTP-запроса может замедлить ответ пользователю и привести к потере уведомления при сбое почтового сервиса. RabbitMQ позволяет вынести эту задачу в фоновый процесс.
Почему RabbitMQ подходит для этой задачи:
- Надежная доставка: Сообщения могут быть персистентными и подтверждаться (ACK), что гарантирует их обработку даже при сбоях.
- Распределение нагрузки: Несколько воркеров (consumer'ов) могут обрабатывать сообщения из одной очереди параллельно.
- Отказоустойчивость: Сообщения сохраняются в очереди, пока не будут успешно обработаны.
- Гибкость маршрутизации: Поддержка различных типов обменов (exchanges) для сложной маршрутизации сообщений.
- Асинхронность: Основной процесс не блокируется ожиданием выполнения фоновой задачи.
Пример кода (Python с библиотекой pika):
Для демонстрации Producer и Consumer обычно запускаются как отдельные процессы.
1. Producer (отправитель задачи)
import pika
import json
import time
def send_email_task(user_email: str, subject: str, body: str):
connection = None
try:
# Устанавливаем соединение с RabbitMQ брокером
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем очередь, если она еще не существует
# durable=True делает очередь персистентной (сохраняется при перезапуске брокера)
channel.queue_declare(queue='email_queue', durable=True)
message = {
'user_email': user_email,
'subject': subject,
'body': body
}
# Отправляем сообщение в очередь
# delivery_mode=2 делает сообщение персистентным (сохраняется при перезапуске брокера)
channel.basic_publish(
exchange='', # Используем обмен по умолчанию (direct exchange)
routing_key='email_queue', # Сообщение будет отправлено в очередь 'email_queue'
body=json.dumps(message).encode('utf-8'),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
print(f" [x] Отправлено сообщение для {user_email}")
except pika.exceptions.AMQPConnectionError as e:
print(f"Ошибка подключения к RabbitMQ: {e}")
finally:
if connection and not connection.is_closed:
connection.close()
# Пример использования:
# send_email_task("user1@example.com", "Добро пожаловать!", "Спасибо за регистрацию!")
# send_email_task("user2@example.com", "Ваш заказ принят", "Мы обрабатываем ваш заказ.")
2. Consumer (обработчик задачи)
import pika
import json
import time
def send_email_mock(email: str, subject: str, body: str):
"""
Мок-функция для имитации отправки email.
В реальном приложении здесь будет вызов почтового клиента.
"""
print(f" [x] Имитация отправки email на {email}:")
print(f" Тема: {subject}")
print(f" Тело: {body}")
time.sleep(2) # Имитация задержки отправки email
print(f" [x] Email на {email} отправлен.")
def callback(ch, method, properties, body):
"""
Функция, вызываемая при получении сообщения из очереди.
"""
try:
message = json.loads(body.decode('utf-8'))
user_email = message.get('user_email')
subject = message.get('subject')
email_body = message.get('body')
print(f" [x] Получено сообщение: {user_email}")
send_email_mock(user_email, subject, email_body)
# Подтверждаем, что сообщение успешно обработано
ch.basic_ack(delivery_tag=method.delivery_tag)
except json.JSONDecodeError:
print(f" [!] Ошибка декодирования JSON: {body}")
# Можно отправить сообщение в очередь для "битых" сообщений (dead-letter queue)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
print(f" [!] Ошибка обработки сообщения: {e}")
# Если произошла ошибка, можно вернуть сообщение в очередь (requeue=True)
# или отправить в dead-letter queue
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
connection = None
try:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем ту же очередь, что и продюсер
channel.queue_declare(queue='email_queue', durable=True)
# Указываем, что consumer будет брать не более 1 сообщения за раз
# Это предотвращает отправку новых сообщений consumer'у, пока он не подтвердит предыдущее
channel.basic_qos(prefetch_count=1)
print(' [*] Ожидание сообщений. Для выхода нажмите CTRL+C')
# Начинаем потребление сообщений
channel.basic_consume(queue='email_queue', on_message_callback=callback)
channel.start_consuming()
except pika.exceptions.AMQPConnectionError as e:
print(f"Ошибка подключения к RabbitMQ: {e}")
except KeyboardInterrupt:
print(" [*] Consumer остановлен.")
finally:
if connection and not connection.is_closed:
connection.close()
В этом примере:
- Producer отправляет задачи на отправку email в очередь
email_queue. - Consumer (или несколько consumer'ов) асинхронно извлекает эти задачи из очереди и выполняет отправку email.
- Использование
durable=Trueдля очереди иdelivery_mode=2для сообщений обеспечивает их персистентность, что критически важно для надежности. basic_ackподтверждает успешную обработку, аbasic_qosпомогает распределить нагрузку между несколькими consumer'ами.
Ответ 18+ 🔞
А, RabbitMQ, ёпта! Это ж как тот самый Герасим, только для сообщений, блядь. Немой, но здоровенный, и всё тащит, пока не упадёт. Представь: твоему сервису надо письма рассылать, а он как начнёт в почтовый сервис тыкаться — ну, как мудак с заиканием в колл-центр, блядь. Весь запнётся, пользователю ответить не может, а тут ещё почтовик лег — и всё, пиздец, письмо в никуда улетело, как Муму в озеро.
А с этим кроликом — совсем другая песня, в рот меня чих-пых! Ты ему просто кидаешь задачу: «На, блядь, отправь письмо этому чуваку». А сам сразу пользователю «окей, всё гуд» отвечаешь. А кролик этот, здоровый такой, стоит, жуёт свою морковку, и молча, блядь, тащит твоё сообщение в специальную ямку — в очередь, то есть. И стоит там, пока не приползёт какой-нибудь фоновый работяга (consumer, ёпта) и не заберёт его на отправку. И даже если работяга сдохнет на полпути — сообщение-то в ямке останется! Не потеряется, сука! Вот это надёжность, блядь, ядрёна вошь!
Смотри, как это выглядит в коде, только не засыпай, а то хуй потом разберёшь.
1. Отправитель (Producer) — тот, кто создаёт работу
Этот код просто выплёвывает задание в очередь и забывает. Как тот, кто заказал пиццу и пошёл смотреть сериал.
import pika
import json
def send_email_task(user_email: str, subject: str, body: str):
connection = None
try:
# Подключаемся к кролику на локалхосте
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем очередь. durable=True — это чтобы она пережила перезагрузку кролика.
# Типа не «Муму», а «Муму-навеки», блядь.
channel.queue_declare(queue='email_queue', durable=True)
message = {
'user_email': user_email,
'subject': subject,
'body': body
}
# Выплёвываем сообщение в очередь
channel.basic_publish(
exchange='', # Используем стандартную дырку в заборе
routing_key='email_queue', # Кидаем именно в эту очередь
body=json.dumps(message).encode('utf-8'),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE # Чтобы сообщение тоже пережило перезагрузку
)
)
print(f" [x] Запихнули в очередь письмо для {user_email}")
except pika.exceptions.AMQPConnectionError as e:
print(f"Ошибка подключения к RabbitMQ: {e}") # Кролик, сука, сдох
finally:
if connection and not connection.is_closed:
connection.close()
2. Получатель (Consumer) — тот, кто реально пашет
А этот чувак сидит, уши развесил, и ждёт, когда в очереди появится работа. Как Герасим у проруби, только не топит, а отправляет.
import pika
import json
import time
def send_email_mock(email: str, subject: str, body: str):
# Это заглушка, которая притворяется почтовым сервисом
print(f" [x] Притворяемся, что шлём письмо на {email}:")
print(f" Тема: {subject}")
time.sleep(2) # Имитируем, что всё идёт не быстро, блядь
print(f" [x] Ну типа отправили на {email}.")
def callback(ch, method, properties, body):
# Эта функция вызывается, когда в очереди что-то просыпалось
try:
message = json.loads(body.decode('utf-8'))
user_email = message.get('user_email')
subject = message.get('subject')
email_body = message.get('body')
print(f" [x] Вытащили из очереди работу для {user_email}")
send_email_mock(user_email, subject, email_body)
# Говорим кролику: «Всё, братан, я справился, сообщение можно нахуй удалять»
ch.basic_ack(delivery_tag=method.delivery_tag)
except json.JSONDecodeError:
print(f" [!] Что за хуйню мне прислали? JSON не читается!")
# Говорим кролику: «Это говно, в очередь возвращать не надо, выкидывай»
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
print(f" [!] Во время отправки случилась дичь: {e}")
# Говорим кролику: «Чувак, я обосрался, давай это сообщение обратно в очередь, пусть другой попробует»
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
connection = None
try:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Опять объявляем очередь, на всякий пожарный
channel.queue_declare(queue='email_queue', durable=True)
# Говорим: «Не пихай мне больше одного сообщения за раз, а то я, блядь, не справлюсь»
channel.basic_qos(prefetch_count=1)
print(' [*] Сижу, курю, жду писем для отправки. Вырубить — Ctrl+C')
# Вешаем на себя ярлык «работяга для email_queue» и начинаем слушать
channel.basic_consume(queue='email_queue', on_message_callback=callback)
channel.start_consuming() # И уходим в вечное ожидание, как дурак
except pika.exceptions.AMQPConnectionError as e:
print(f"Кролик куда-то пропал: {e}")
except KeyboardInterrupt:
print(" [*] Всё, приехали, закругляемся.")
finally:
if connection and not connection.is_closed:
connection.close()
И вся магия, блядь, в чём? В том, что этих работяг (consumer'ов) можно запустить овердохуища! Один упал — другой подхватит. Очередь растёт — ты просто добавляешь новых работяг, и они всё разгребают. А основной сервис при этом не тупит, не ждёт, пользователям быстро отвечает. Красота, ёпта! Не то что синхронные вызовы, которые одним ебальником всё кладут.