Ответ
Да, интегрировал брокеры сообщений в PHP-приложения для асинхронной обработки задач и построения событийно-ориентированной архитектуры. Основной опыт — с RabbitMQ (для фоновых задач и межсервисного взаимодействия) и Kafka (для потоковой обработки логов и событий).
Пример: Отправка email через RabbitMQ с использованием библиотеки php-amqplib.
Producer (отправляет задачу):
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPStreamConnection('rabbitmq-host', 5672, 'user', 'pass');
$channel = $connection->channel();
// Объявляем durable очередь (переживет перезагрузку брокера)
$channel->queue_declare('email_queue', false, true, false, false);
$emailData = [
'to' => 'user@example.com',
'subject' => 'Welcome',
'body' => '...'
];
$msg = new AMQPMessage(
json_encode($emailData),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] // Сообщение persistent
);
$channel->basic_publish($msg, '', 'email_queue');
$channel->close();
$connection->close();
Consumer (обрабатывает задачу в фоне):
// consumer.php (запускается как демон)
$connection = new AMQPStreamConnection('rabbitmq-host', 5672, 'user', 'pass');
$channel = $connection->channel();
$channel->queue_declare('email_queue', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+Cn";
$callback = function (AMQPMessage $msg) {
echo ' [x] Received ', $msg->body, "n";
$data = json_decode($msg->body, true);
// Отправляем email...
sleep(2); // Имитация работы
echo " [x] Email sent to " . $data['to'] . "n";
// Подтверждаем успешную обработку (ack)
$msg->ack();
};
// Настраиваем QoS - не давать воркеру больше 1 сообщения за раз
$channel->basic_qos(null, 1, null);
$channel->basic_consume('email_queue', '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
Опыт с Kafka (для потоков событий):
- Использовал
arnaud-lb/php-rdkafkaдля интеграции. - Основное применение — отправка событий пользовательских действий (клики, просмотры) в топик Kafka для последующей аналитики в Apache Spark или аналогичных системах.
- В PHP важно правильно управлять памятью в long-running consumer'ах и использовать graceful shutdown для корректного коммита оффсетов.
Выбор в проектах:
- RabbitMQ выбирал для задач, где важна гарантированная доставка, сложные маршрутизации (exchanges) и относительно невысокая нагрузка (например, обработка заказов, нотификации).
- Kafka применял для высоконагруженных потоков данных, где важна пропускная способность и возможность повторно читать историю событий (например, логгирование, трекинг поведения).
Ответ 18+ 🔞
А, ну это про интеграцию брокеров в PHP? Да, конечно, делал такое, не раз. Это ж, бля, классика — отвязать тяжёлые задачи от основного потока, чтобы пользователь не ждал, пока там письмо нахуй отправится или отчёт сгенерируется. Овердохуища пользы, если правильно настроить.
В основном работал с двумя монстрами: RabbitMQ — для всяких фоновых задач и чтобы сервисы между собой болтались, и Kafka — когда нужно было события или логи потоком гнать, как будто из пожарного шланга пиздит.
Вот, смотри, как обычно письма через RabbitMQ гоняли. Использовал библиотеку php-amqplib.
Отправитель (Producer), который задачу в очередь пихает:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPStreamConnection('rabbitmq-host', 5672, 'user', 'pass');
$channel = $connection->channel();
// Объявляем durable очередь (чтоб даже если кролик перезагрузится — очередь не накрылась медным тазом)
$channel->queue_declare('email_queue', false, true, false, false);
$emailData = [
'to' => 'user@example.com',
'subject' => 'Welcome',
'body' => '...'
];
$msg = new AMQPMessage(
json_encode($emailData),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] // Сообщение тоже persistent, чтоб не потерялось
);
$channel->basic_publish($msg, '', 'email_queue');
$channel->close();
$connection->close();
А вот потребитель (Consumer), который в фоне, как зек на лесоповале, эти задачи обрабатывает:
// consumer.php (этот скрипт запускается как демон и висит, слушает)
$connection = new AMQPStreamConnection('rabbitmq-host', 5672, 'user', 'pass');
$channel = $connection->channel();
$channel->queue_declare('email_queue', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+Cn";
$callback = function (AMQPMessage $msg) {
echo ' [x] Received ', $msg->body, "n";
$data = json_decode($msg->body, true);
// Тут реальная отправка письма...
sleep(2); // Просто имитируем, что работаем, ебать колотить
echo " [x] Email sent to " . $data['to'] . "n";
// Говорим RabbitMQ: «Всё, чувак, обработал, можешь сообщение удалять» (ack)
$msg->ack();
};
// Настраиваем QoS — не грузим воркера по уши, пусть берёт по одному сообщению за раз
$channel->basic_qos(null, 1, null);
$channel->basic_consume('email_queue', '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
А с Kafka история отдельная, ёпта. Там уже для потоков событий, где объёмы — просто пиздец.
- Использовал
arnaud-lb/php-rdkafka, благо, биндинги к либе на C есть. - Чаще всего пихал в него события действий пользователей: кликнул тут, посмотрел там. Потом эта телега летела в Apache Spark или ещё куда, чтобы аналитики голову ломали.
- Главное в PHP — следить за памятью в этих долгоживущих консьюмерах и делать graceful shutdown, чтобы оффсеты корректно закоммитить, а не потерять половину данных, хитрая жопа.
В общем, выбор простой, если разобраться:
- RabbitMQ — когда нужны гарантии доставки, сложные маршруты через exchange'и и нагрузка не зашкаливает. Ну, там заказы, нотификации — классика.
- Kafka — когда на тебя поток данных льётся, как из рога изобилия, и важно не потерять ни хуя, да ещё и иметь возможность назад отмотать и перечитать. Для логов и трекинга — идеально, ядрёна вошь.