Ответ
Да, интегрировал брокеры сообщений в PHP-приложения для асинхронной обработки задач и построения событийно-ориентированной архитектуры. Основной опыт — с RabbitMQ (для фоновых задач и межсервисного взаимодействия) и Kafka (для потоковой обработки логов и событий).
Пример: Отправка email через RabbitMQ с использованием библиотеки php-amqplib.
Producer (отправляет задачу):
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPStreamConnection('rabbitmq-host', 5672, 'user', 'pass');
$channel = $connection->channel();
// Объявляем durable очередь (переживет перезагрузку брокера)
$channel->queue_declare('email_queue', false, true, false, false);
$emailData = [
'to' => 'user@example.com',
'subject' => 'Welcome',
'body' => '...'
];
$msg = new AMQPMessage(
json_encode($emailData),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] // Сообщение persistent
);
$channel->basic_publish($msg, '', 'email_queue');
$channel->close();
$connection->close();
Consumer (обрабатывает задачу в фоне):
// consumer.php (запускается как демон)
$connection = new AMQPStreamConnection('rabbitmq-host', 5672, 'user', 'pass');
$channel = $connection->channel();
$channel->queue_declare('email_queue', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+Cn";
$callback = function (AMQPMessage $msg) {
echo ' [x] Received ', $msg->body, "n";
$data = json_decode($msg->body, true);
// Отправляем email...
sleep(2); // Имитация работы
echo " [x] Email sent to " . $data['to'] . "n";
// Подтверждаем успешную обработку (ack)
$msg->ack();
};
// Настраиваем QoS - не давать воркеру больше 1 сообщения за раз
$channel->basic_qos(null, 1, null);
$channel->basic_consume('email_queue', '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
Опыт с Kafka (для потоков событий):
- Использовал
arnaud-lb/php-rdkafkaдля интеграции. - Основное применение — отправка событий пользовательских действий (клики, просмотры) в топик Kafka для последующей аналитики в Apache Spark или аналогичных системах.
- В PHP важно правильно управлять памятью в long-running consumer'ах и использовать graceful shutdown для корректного коммита оффсетов.
Выбор в проектах:
- RabbitMQ выбирал для задач, где важна гарантированная доставка, сложные маршрутизации (exchanges) и относительно невысокая нагрузка (например, обработка заказов, нотификации).
- Kafka применял для высоконагруженных потоков данных, где важна пропускная способность и возможность повторно читать историю событий (например, логгирование, трекинг поведения).