Работали ли вы с брокерами сообщений (message brokers) в PHP?

Ответ

Да, интегрировал брокеры сообщений в PHP-приложения для асинхронной обработки задач и построения событийно-ориентированной архитектуры. Основной опыт — с RabbitMQ (для фоновых задач и межсервисного взаимодействия) и Kafka (для потоковой обработки логов и событий).

Пример: Отправка email через RabbitMQ с использованием библиотеки php-amqplib.

Producer (отправляет задачу):

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPStreamConnection('rabbitmq-host', 5672, 'user', 'pass');
$channel = $connection->channel();

// Объявляем durable очередь (переживет перезагрузку брокера)
$channel->queue_declare('email_queue', false, true, false, false);

$emailData = [
    'to' => 'user@example.com',
    'subject' => 'Welcome',
    'body' => '...'
];

$msg = new AMQPMessage(
    json_encode($emailData),
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] // Сообщение persistent
);

$channel->basic_publish($msg, '', 'email_queue');

$channel->close();
$connection->close();

Consumer (обрабатывает задачу в фоне):

// consumer.php (запускается как демон)
$connection = new AMQPStreamConnection('rabbitmq-host', 5672, 'user', 'pass');
$channel = $connection->channel();
$channel->queue_declare('email_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+Cn";

$callback = function (AMQPMessage $msg) {
    echo ' [x] Received ', $msg->body, "n";
    $data = json_decode($msg->body, true);
    // Отправляем email...
    sleep(2); // Имитация работы
    echo " [x] Email sent to " . $data['to'] . "n";

    // Подтверждаем успешную обработку (ack)
    $msg->ack();
};

// Настраиваем QoS - не давать воркеру больше 1 сообщения за раз
$channel->basic_qos(null, 1, null);
$channel->basic_consume('email_queue', '', false, false, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

Опыт с Kafka (для потоков событий):

  • Использовал arnaud-lb/php-rdkafka для интеграции.
  • Основное применение — отправка событий пользовательских действий (клики, просмотры) в топик Kafka для последующей аналитики в Apache Spark или аналогичных системах.
  • В PHP важно правильно управлять памятью в long-running consumer'ах и использовать graceful shutdown для корректного коммита оффсетов.

Выбор в проектах:

  • RabbitMQ выбирал для задач, где важна гарантированная доставка, сложные маршрутизации (exchanges) и относительно невысокая нагрузка (например, обработка заказов, нотификации).
  • Kafka применял для высоконагруженных потоков данных, где важна пропускная способность и возможность повторно читать историю событий (например, логгирование, трекинг поведения).

Ответ 18+ 🔞

А, ну это про интеграцию брокеров в PHP? Да, конечно, делал такое, не раз. Это ж, бля, классика — отвязать тяжёлые задачи от основного потока, чтобы пользователь не ждал, пока там письмо нахуй отправится или отчёт сгенерируется. Овердохуища пользы, если правильно настроить.

В основном работал с двумя монстрами: RabbitMQ — для всяких фоновых задач и чтобы сервисы между собой болтались, и Kafka — когда нужно было события или логи потоком гнать, как будто из пожарного шланга пиздит.

Вот, смотри, как обычно письма через RabbitMQ гоняли. Использовал библиотеку php-amqplib.

Отправитель (Producer), который задачу в очередь пихает:

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPStreamConnection('rabbitmq-host', 5672, 'user', 'pass');
$channel = $connection->channel();

// Объявляем durable очередь (чтоб даже если кролик перезагрузится — очередь не накрылась медным тазом)
$channel->queue_declare('email_queue', false, true, false, false);

$emailData = [
    'to' => 'user@example.com',
    'subject' => 'Welcome',
    'body' => '...'
];

$msg = new AMQPMessage(
    json_encode($emailData),
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] // Сообщение тоже persistent, чтоб не потерялось
);

$channel->basic_publish($msg, '', 'email_queue');

$channel->close();
$connection->close();

А вот потребитель (Consumer), который в фоне, как зек на лесоповале, эти задачи обрабатывает:

// consumer.php (этот скрипт запускается как демон и висит, слушает)
$connection = new AMQPStreamConnection('rabbitmq-host', 5672, 'user', 'pass');
$channel = $connection->channel();
$channel->queue_declare('email_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+Cn";

$callback = function (AMQPMessage $msg) {
    echo ' [x] Received ', $msg->body, "n";
    $data = json_decode($msg->body, true);
    // Тут реальная отправка письма...
    sleep(2); // Просто имитируем, что работаем, ебать колотить
    echo " [x] Email sent to " . $data['to'] . "n";

    // Говорим RabbitMQ: «Всё, чувак, обработал, можешь сообщение удалять» (ack)
    $msg->ack();
};

// Настраиваем QoS — не грузим воркера по уши, пусть берёт по одному сообщению за раз
$channel->basic_qos(null, 1, null);
$channel->basic_consume('email_queue', '', false, false, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

А с Kafka история отдельная, ёпта. Там уже для потоков событий, где объёмы — просто пиздец.

  • Использовал arnaud-lb/php-rdkafka, благо, биндинги к либе на C есть.
  • Чаще всего пихал в него события действий пользователей: кликнул тут, посмотрел там. Потом эта телега летела в Apache Spark или ещё куда, чтобы аналитики голову ломали.
  • Главное в PHP — следить за памятью в этих долгоживущих консьюмерах и делать graceful shutdown, чтобы оффсеты корректно закоммитить, а не потерять половину данных, хитрая жопа.

В общем, выбор простой, если разобраться:

  • RabbitMQ — когда нужны гарантии доставки, сложные маршруты через exchange'и и нагрузка не зашкаливает. Ну, там заказы, нотификации — классика.
  • Kafka — когда на тебя поток данных льётся, как из рога изобилия, и важно не потерять ни хуя, да ещё и иметь возможность назад отмотать и перечитать. Для логов и трекинга — идеально, ядрёна вошь.