Ответ
Для организации синхронного взаимодействия в асинхронной среде (например, при работе с очередями сообщений) часто применяется паттерн Request-Reply (или Request-Response).
Как это работает:
- Отправитель (Requestor) публикует сообщение в очередь запросов, указывая уникальный идентификатор корреляции (
correlationId) и адрес очереди для ответа (replyTo). - Получатель (Replier) обрабатывает сообщение и отправляет результат в очередь, указанную в
replyTo, включая тот жеcorrelationId. - Отправитель слушает очередь ответов, фильтруя сообщения по
correlationId, и получает результат.
Практическая реализация (пример с RabbitMQ):
# Отправитель (Requestor)
import pika
import uuid
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем эксклюзивную очередь для ответов
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue
correlation_id = str(uuid.uuid4())
# Колбэк для обработки ответа
def on_response(ch, method, props, body):
if correlation_id == props.correlation_id:
print(f" [x] Received response: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
connection.close()
channel.basic_consume(queue=callback_queue, on_message_callback=on_response)
# Отправка запроса
channel.basic_publish(
exchange='',
routing_key='request_queue',
properties=pika.BasicProperties(
reply_to=callback_queue,
correlation_id=correlation_id,
),
body='Запрос на обработку'
)
print(" [x] Sent request")
channel.start_consuming()
Альтернативы и нюансы:
- RPC (Remote Procedure Call): Многие брокеры (RabbitMQ, Apache Kafka) имеют встроенные механизмы RPC, которые реализуют этот паттерн.
- Временные очереди: Очередь
replyToчасто создается как временная (exclusive), что упрощает управление. - Таймауты: Критически важно реализовать таймауты на стороне отправителя, чтобы избежать бесконечного ожидания в случае сбоя.
- Синхронность: Паттерн имитирует синхронное взаимодействие поверх асинхронной шины, что может быть полезно для API-шлюзов или микросервисов, требующих немедленного ответа.
Ответ 18+ 🔞
Да ты посмотри, какой паттерн придумали, чтобы в асинхронном бардаке синхронность навести! Request-Reply, ёпта, он же Request-Response. Ну, типа, чтобы не просто кинуть сообщение в пустоту и забыть, а ещё и ответ дождаться, как цивилизованный человек.
Как эта штука, блядь, работает, если по-простому:
- Ты, как отправитель (Requestor), вываливаешь свой запрос в очередь. Но не просто так, а с двумя важными бумажками в кармане: уникальный билетик (
correlationId) и адресочек своей личной почтовой ячейки (replyTo), куда тебе ответ слать. - Получатель (Replier) на том конце берёт твой запрос, чешет репу, делает что надо и кладёт результат в твою ячейку (
replyTo), не забыв пришпилить тот же самый билетик (correlationId). - Ты сидишь, ушами хлопаешь, слушаешь свою ячейку. Как только видишь сообщение с твоим билетиком — всё, пизда, ответ твой. Можно выдохнуть.
Вот, смотри, как это в коде выглядит (на примере RabbitMQ):
# Отправитель (Requestor) - тот, кто просит
import pika
import uuid
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Делаем себе временную, личную очередь для ответов. Эксклюзивную, блядь!
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue
# Генерим уникальный билетик, чтобы свой ответ не с чужим перепутать
correlation_id = str(uuid.uuid4())
# Это функция-ловушка, которая сработает, когда припрут ответ
def on_response(ch, method, props, body):
# Сверяем билетики! Если наш — значит, это наш ответ, а не левая хрень.
if correlation_id == props.correlation_id:
print(f" [x] Получил ответ, наконец-то: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
connection.close() # Всё, работу сделал, можно и честь знать.
# Начинаем караулить свою личную очередь ответов
channel.basic_consume(queue=callback_queue, on_message_callback=on_response)
# А вот теперь ПУСК! Шлём сам запрос.
channel.basic_publish(
exchange='',
routing_key='request_queue', # Очередь, куда стучатся все просители
properties=pika.BasicProperties(
reply_to=callback_queue, # "Отзовитесь сюда, плз"
correlation_id=correlation_id, # "А я вот этот чувак"
),
body='Запрос на обработку, не подведите'
)
print(" [x] Запрос улетел, сижу жду...")
channel.start_consuming() # И вот тут начинается самое интересное — ожидание, блядь.
А теперь про подводные, сука, камни и альтернативы:
- RPC (Удалённый вызов процедур): Зачем велосипед изобретать? У многих брокеров (RabbitMQ, Apache Kafka) эта фича уже из коробки идёт, они там всё за тебя оберут.
- Временные очереди: Очередь для ответов (
replyTo) — она обычно одноразовая, на один запрос. Создали, получили ответ, закрыли. Красота, мусора не остаётся. - Таймауты — это святое! А то представь: отправил запрос, а получатель сгорел. И ты сидишь, ждёшь, как лох, до второго пришествия. Надо ставить дедлайн, ебать, и если ответ не пришёл — ругаться матом и делать что-то ещё.
- Суть паттерна: Он создаёт иллюзию, что ты позвонил в синхронный API, хотя на самом деле под капотом там асинхронная почта гудит. Очень удобно, когда микросервису или API-шлюзу надо прям щас получить ответ, а не "как-нибудь потом, может быть".