Ответ
Проблема в том, что нужно атомарно сохранить изменение состояния в базе данных И отправить соответствующее событие. Если что-то пойдет не так после сохранения в БД, но до отправки в очередь, система окажется в несогласованном состоянии. Я решал это с помощью паттерна Transactional Outbox.
Суть паттерна: Событие записывается как запись в таблицу БД (outbox) в рамках ТОЙ ЖЕ транзакции, что и основное бизнес-действие. Отправкой в очередь занимается отдельный фоновый процесс.
Пример реализации на PHP (Laravel-стиль):
-
Структура таблицы
outbox_messages:CREATE TABLE outbox_messages ( id BIGSERIAL PRIMARY KEY, aggregate_type VARCHAR(255), -- e.g., 'Order' aggregate_id VARCHAR(255), -- e.g., order UUID event_type VARCHAR(255), -- e.g., 'OrderCreated' payload JSONB NOT NULL, -- Данные события created_at TIMESTAMP NOT NULL DEFAULT NOW(), sent_at TIMESTAMP NULL -- Отметка об успешной отправке ); -
Код сервиса, создающего заказ:
// OrderService.php DB::transaction(function () use ($orderData) { // 1. Сохраняем основную сущность $order = Order::create([ 'user_id' => $orderData['user_id'], 'total' => $orderData['total'], 'status' => 'created' ]); // 2. В ТОЙ ЖЕ транзакции записываем событие в Outbox OutboxMessage::create([ 'aggregate_type' => 'Order', 'aggregate_id' => $order->id, 'event_type' => 'OrderCreated', 'payload' => json_encode([ 'orderId' => $order->id, 'userId' => $order->user_id, 'total' => $order->total, 'occurredAt' => now()->toIso8601String() ]) ]); // Транзакция коммитится атомарно: либо и Order, и OutboxMessage сохранены, либо ничего. }); -
Отдельный процесс-релевер (Publisher): Этот процесс периодически опрашивает таблицу
outbox_messages(гдеsent_at IS NULL), отправляет события в RabbitMQ/Kafka и помечает их как отправленные.// OutboxPublisher.php $messages = OutboxMessage::whereNull('sent_at')->orderBy('id')->limit(100)->get(); foreach ($messages as $message) { try { $this->rabbitMq->publish( 'order_events', $message->event_type, $message->payload ); $message->update(['sent_at' => now()]); // Помечаем как отправленное } catch (Exception $e) { Log::error('Failed to publish outbox message', ['id' => $message->id]); // Не помечаем как отправленное, попробуем снова позже } }
Преимущества:
- Гарантированная доставка: Событие никогда не потеряется, так как оно сохранено в БД.
- Согласованность: Состояние БД и факт генерации события всегда синхронизированы.
- Идемпотентность потребителя: Потребитель события должен быть идемпотентным, так как в редких случаях (сбой после отправки, но до отметки
sent_at) событие может быть отправлено повторно.