Ответ
Очередь (queue) в RabbitMQ — это именованный буфер, который хранит сообщения, отправленные производителями (producers), до момента их получения и обработки потребителями (consumers).
Основные функции очередей:
-
Асинхронность и расцепление (Decoupling): Производители и потребители работают независимо друг от друга. Производитель может отправить сообщение и продолжить работу, не дожидаясь его обработки. Потребитель может быть временно недоступен, но сообщения для него сохранятся в очереди.
-
Распределение нагрузки (Load Balancing): Несколько экземпляров потребителей могут читать сообщения из одной очереди. RabbitMQ будет распределять сообщения между ними по принципу round-robin, что позволяет горизонтально масштабировать обработку задач.
-
Гарантированная доставка и отказоустойчивость: Сообщения хранятся в очереди до тех пор, пока потребитель не подтвердит их успешную обработку (acknowledgement). Если потребитель падает, не обработав сообщение, оно возвращается в очередь и будет передано другому свободному потребителю.
-
Сглаживание пиковых нагрузок: Если производители генерируют сообщения быстрее, чем потребители могут их обработать, очередь выступает в роли буфера, предотвращая перегрузку системы-получателя.
Пример на Python с pika:
# producer.py - Отправляет сообщение в очередь 'task_queue'
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Убедимся, что очередь существует
channel.queue_declare(queue='task_queue')
channel.basic_publish(exchange='',
routing_key='task_queue',
body='Process this video!')
print(" [x] Sent 'Process this video!'")
connection.close()
# consumer.py - Получает сообщение из очереди 'task_queue'
import pika
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# Здесь происходит обработка задачи
ch.basic_ack(delivery_tag=method.delivery_tag) # Подтверждение обработки
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue')
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()