Ответ
Брокер сообщений — это программный посредник, который управляет обменом данными между различными сервисами (приложениями), обеспечивая их асинхронное взаимодействие.
Он решает несколько ключевых проблем в распределенных системах:
-
Слабая связанность (Decoupling): Сервисы-отправители (producers) не знают о существовании и местоположении сервисов-получателей (consumers). Они просто отправляют сообщение в очередь, а брокер доставляет его.
-
Надежность и отказоустойчивость: Если сервис-получатель временно недоступен, сообщение не теряется. Брокер сохраняет его в очереди и доставит, когда получатель снова будет онлайн.
-
Масштабируемость: Для обработки возросшей нагрузки можно легко добавить больше экземпляров сервиса-получателя, которые будут читать сообщения из одной и той же очереди.
-
Буферизация нагрузки: Брокер сглаживает пиковые нагрузки. Если отправитель генерирует сообщения быстрее, чем получатель может их обработать, они накапливаются в очереди, а не перегружают получателя.
Популярные брокеры: RabbitMQ, Apache Kafka, ActiveMQ.
Пример на Python с RabbitMQ:
- Отправитель (
producer.py)
import pika
# Установка соединения с RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создание очереди 'task_queue', если она не существует
channel.queue_declare(queue='task_queue', durable=True)
# Публикация сообщения
message = 'Hello, world!'
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # сделать сообщение персистентным
))
print(f" [x] Sent '{message}'")
connection.close()
- Получатель (
consumer.py)
import pika
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# Подтверждение обработки сообщения
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
# Подписка на очередь
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()