Как гарантировать целостность отправки событий в очередь сообщений?

«Как гарантировать целостность отправки событий в очередь сообщений?» — вопрос из категории Архитектура, который задают на 24% собеседований PHP Разработчик. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Проблема в том, что нужно атомарно сохранить изменение состояния в базе данных И отправить соответствующее событие. Если что-то пойдет не так после сохранения в БД, но до отправки в очередь, система окажется в несогласованном состоянии. Я решал это с помощью паттерна Transactional Outbox.

Суть паттерна: Событие записывается как запись в таблицу БД (outbox) в рамках ТОЙ ЖЕ транзакции, что и основное бизнес-действие. Отправкой в очередь занимается отдельный фоновый процесс.

Пример реализации на PHP (Laravel-стиль):

  1. Структура таблицы outbox_messages:

    CREATE TABLE outbox_messages (
        id BIGSERIAL PRIMARY KEY,
        aggregate_type VARCHAR(255), -- e.g., 'Order'
        aggregate_id VARCHAR(255),   -- e.g., order UUID
        event_type VARCHAR(255),     -- e.g., 'OrderCreated'
        payload JSONB NOT NULL,      -- Данные события
        created_at TIMESTAMP NOT NULL DEFAULT NOW(),
        sent_at TIMESTAMP NULL       -- Отметка об успешной отправке
    );
  2. Код сервиса, создающего заказ:

    // OrderService.php
    DB::transaction(function () use ($orderData) {
        // 1. Сохраняем основную сущность
        $order = Order::create([
            'user_id' => $orderData['user_id'],
            'total' => $orderData['total'],
            'status' => 'created'
        ]);
    
        // 2. В ТОЙ ЖЕ транзакции записываем событие в Outbox
        OutboxMessage::create([
            'aggregate_type' => 'Order',
            'aggregate_id' => $order->id,
            'event_type' => 'OrderCreated',
            'payload' => json_encode([
                'orderId' => $order->id,
                'userId' => $order->user_id,
                'total' => $order->total,
                'occurredAt' => now()->toIso8601String()
            ])
        ]);
        // Транзакция коммитится атомарно: либо и Order, и OutboxMessage сохранены, либо ничего.
    });
  3. Отдельный процесс-релевер (Publisher): Этот процесс периодически опрашивает таблицу outbox_messages (где sent_at IS NULL), отправляет события в RabbitMQ/Kafka и помечает их как отправленные.

    // OutboxPublisher.php
    $messages = OutboxMessage::whereNull('sent_at')->orderBy('id')->limit(100)->get();
    
    foreach ($messages as $message) {
        try {
            $this->rabbitMq->publish(
                'order_events',
                $message->event_type,
                $message->payload
            );
            $message->update(['sent_at' => now()]); // Помечаем как отправленное
        } catch (Exception $e) {
            Log::error('Failed to publish outbox message', ['id' => $message->id]);
            // Не помечаем как отправленное, попробуем снова позже
        }
    }

Преимущества:

  • Гарантированная доставка: Событие никогда не потеряется, так как оно сохранено в БД.
  • Согласованность: Состояние БД и факт генерации события всегда синхронизированы.
  • Идемпотентность потребителя: Потребитель события должен быть идемпотентным, так как в редких случаях (сбой после отправки, но до отметки sent_at) событие может быть отправлено повторно.