Ответ
Для организации синхронного взаимодействия в асинхронной среде (например, при работе с очередями сообщений) часто применяется паттерн Request-Reply (или Request-Response).
Как это работает:
- Отправитель (Requestor) публикует сообщение в очередь запросов, указывая уникальный идентификатор корреляции (
correlationId) и адрес очереди для ответа (replyTo). - Получатель (Replier) обрабатывает сообщение и отправляет результат в очередь, указанную в
replyTo, включая тот жеcorrelationId. - Отправитель слушает очередь ответов, фильтруя сообщения по
correlationId, и получает результат.
Практическая реализация (пример с RabbitMQ):
# Отправитель (Requestor)
import pika
import uuid
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем эксклюзивную очередь для ответов
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue
correlation_id = str(uuid.uuid4())
# Колбэк для обработки ответа
def on_response(ch, method, props, body):
if correlation_id == props.correlation_id:
print(f" [x] Received response: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
connection.close()
channel.basic_consume(queue=callback_queue, on_message_callback=on_response)
# Отправка запроса
channel.basic_publish(
exchange='',
routing_key='request_queue',
properties=pika.BasicProperties(
reply_to=callback_queue,
correlation_id=correlation_id,
),
body='Запрос на обработку'
)
print(" [x] Sent request")
channel.start_consuming()
Альтернативы и нюансы:
- RPC (Remote Procedure Call): Многие брокеры (RabbitMQ, Apache Kafka) имеют встроенные механизмы RPC, которые реализуют этот паттерн.
- Временные очереди: Очередь
replyToчасто создается как временная (exclusive), что упрощает управление. - Таймауты: Критически важно реализовать таймауты на стороне отправителя, чтобы избежать бесконечного ожидания в случае сбоя.
- Синхронность: Паттерн имитирует синхронное взаимодействие поверх асинхронной шины, что может быть полезно для API-шлюзов или микросервисов, требующих немедленного ответа.