Ответ
Это классическая проблема, так как невозможно создать единую распределенную транзакцию, охватывающую и базу данных, и внешний брокер сообщений. Если сервис упадет между коммитом в БД и отправкой сообщения, система окажется в неконсистентном состоянии.
Решением является паттерн Transactional Outbox (Транзакционный исходящий ящик).
Суть паттерна
Операция делится на два этапа:
Атомарная запись в БД: В рамках одной локальной транзакции базы данных вы делаете две вещи:
- Сохраняете основную бизнес-сущность (например, заказ).
- Сохраняете событие, которое нужно отправить, в специальную таблицу
outbox
.
Поскольку обе записи происходят в одной транзакции, они либо обе успешно завершатся, либо обе откатятся. Это гарантирует, что событие не потеряется, если бизнес-данные были сохранены.
Асинхронная отправка сообщения: Отдельный процесс (называемый Relay или Poller) периодически опрашивает таблицу
outbox
, забирает необработанные события, отправляет их в брокер сообщений (Kafka, RabbitMQ) и помечает как отправленные (или удаляет).
Пример реализации на Go:
// 1. Сохранение заказа и события в одной транзакции
func CreateOrder(ctx context.Context, db *sql.DB, order Order) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
// defer tx.Rollback() не нужен при явном Commit/Rollback в конце
// Шаг 1.1: Сохраняем заказ
if err := saveOrderInTx(tx, order); err != nil {
tx.Rollback()
return err
}
// Шаг 1.2: Создаем событие для отправки
eventPayload, _ := json.Marshal(order)
event := OutboxEvent{
AggregateType: "order",
AggregateID: order.ID,
EventType: "OrderCreated",
Payload: eventPayload,
}
// Шаг 1.3: Сохраняем событие в таблицу outbox
if err := saveOutboxEventInTx(tx, event); err != nil {
tx.Rollback()
return err
}
// Шаг 1.4: Коммитим транзакцию. Теперь данные и событие сохранены атомарно.
return tx.Commit()
}
// 2. Relay-процесс (работает в отдельной горутине или сервисе)
func processOutbox(ctx context.Context, db *sql.DB, producer kafka.Producer) {
for {
// Находит необработанные события в таблице outbox
events := findUnprocessedEvents(db)
for _, event := range events {
// Отправляет событие в Kafka
if err := producer.Produce(event.Payload); err == nil {
// Помечает событие как обработанное в БД
markEventAsProcessed(db, event.ID)
} else {
// Логируем ошибку, повторная попытка будет в следующей итерации
}
}
time.Sleep(5 * time.Second) // Пауза между опросами
}
}
Дополнительные важные аспекты:
- Идемпотентность потребителей: Сервисы, которые слушают эти события, должны быть идемпотентными, так как Relay может отправить одно и то же сообщение более одного раза (например, если он упал после отправки, но до того, как пометил событие обработанным).
- Dead Letter Queue (DLQ): Для "отравленных" сообщений, которые не удается обработать после нескольких попыток, следует предусмотреть механизм DLQ, чтобы они не блокировали очередь.