Ответ
RabbitMQ — это открытый брокер сообщений, реализующий протокол AMQP 0-9-1 (Advanced Message Queuing Protocol). Он выступает в роли промежуточного слоя (middleware), обеспечивая асинхронную, надежную и гибкую передачу сообщений между компонентами распределенной системы (производителями и потребителями).
Ключевые компоненты архитектуры RabbitMQ:
- Producer (Publisher): Приложение, которое отправляет сообщения.
- Exchange: Принимает сообщения от производителя и решает, в какие очереди их направить. Тип обмена определяет правила маршрутизации.
- Queue: Буфер, в котором сообщения хранятся до тех пор, пока их не заберут потребители. Очередь привязана к обмену с помощью binding и ключа маршрутизации (routing key).
- Consumer: Приложение, которое получает и обрабатывает сообщения из очереди.
Типы обменов (Exchange Types):
| Тип | Описание | Аналогия |
|---|---|---|
| Direct | Сообщение отправляется в очереди, чей routing key точно совпадает с ключом сообщения. |
Точечная рассылка (unicast). |
| Fanout | Сообщение рассылается во все привязанные очереди, игнорируя routing key. |
Широковещательная рассылка (broadcast). |
| Topic | Сообщение отправляется в очереди, чей routing key совпадает с шаблоном (с использованием * и #). |
Селективная рассылка по шаблону. |
| Headers | Маршрутизация основана на заголовках сообщения, а не на routing key. Используется реже. |
— |
Пример на Java с использованием клиентской библиотеки (amqp-client):
// 1. ПРОИЗВОДИТЕЛЬ (Producer)
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Объявляем очередь (создастся, если не существует)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
// Публикуем сообщение в очередь по умолчанию ("") с routingKey = имени очереди
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
// 2. ПОТРЕБИТЕЛЬ (Consumer)
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages...");
// Callback, который будет вызван при получении сообщения
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// Подписываемся на очередь
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
Основные сценарии использования:
- Разделение (Decoupling): Независимая разработка и масштабирование сервисов.
- Балансировка нагрузки (Load Balancing): Распределение задач между несколькими worker-потребителями ("Work Queues").
- Асинхронная обработка: Отложенные или фоновые задачи (отправка email, генерация отчетов).
- Интеграция микросервисов.
- Гарантированная доставка: Подтверждения (acknowledgements), устойчивые (durable) очереди.