Как обеспечить атомарность операции, которая включает в себя и запись в базу данных, и отправку сообщения в брокер (например, Kafka)?

Ответ

Это классическая проблема, так как невозможно создать единую распределенную транзакцию, охватывающую и базу данных, и внешний брокер сообщений. Если сервис упадет между коммитом в БД и отправкой сообщения, система окажется в неконсистентном состоянии.

Решением является паттерн Transactional Outbox (Транзакционный исходящий ящик).

Суть паттерна

Операция делится на два этапа:

  1. Атомарная запись в БД: В рамках одной локальной транзакции базы данных вы делаете две вещи:

    • Сохраняете основную бизнес-сущность (например, заказ).
    • Сохраняете событие, которое нужно отправить, в специальную таблицу outbox.

      Поскольку обе записи происходят в одной транзакции, они либо обе успешно завершатся, либо обе откатятся. Это гарантирует, что событие не потеряется, если бизнес-данные были сохранены.

  2. Асинхронная отправка сообщения: Отдельный процесс (называемый Relay или Poller) периодически опрашивает таблицу outbox, забирает необработанные события, отправляет их в брокер сообщений (Kafka, RabbitMQ) и помечает как отправленные (или удаляет).


Пример реализации на Go:

// 1. Сохранение заказа и события в одной транзакции
func CreateOrder(ctx context.Context, db *sql.DB, order Order) error {
    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    // defer tx.Rollback() не нужен при явном Commit/Rollback в конце

    // Шаг 1.1: Сохраняем заказ
    if err := saveOrderInTx(tx, order); err != nil {
        tx.Rollback()
        return err
    }

    // Шаг 1.2: Создаем событие для отправки
    eventPayload, _ := json.Marshal(order)
    event := OutboxEvent{
        AggregateType: "order",
        AggregateID:   order.ID,
        EventType:     "OrderCreated",
        Payload:       eventPayload,
    }

    // Шаг 1.3: Сохраняем событие в таблицу outbox
    if err := saveOutboxEventInTx(tx, event); err != nil {
        tx.Rollback()
        return err
    }

    // Шаг 1.4: Коммитим транзакцию. Теперь данные и событие сохранены атомарно.
    return tx.Commit()
}

// 2. Relay-процесс (работает в отдельной горутине или сервисе)
func processOutbox(ctx context.Context, db *sql.DB, producer kafka.Producer) {
    for {
        // Находит необработанные события в таблице outbox
        events := findUnprocessedEvents(db)
        for _, event := range events {
            // Отправляет событие в Kafka
            if err := producer.Produce(event.Payload); err == nil {
                // Помечает событие как обработанное в БД
                markEventAsProcessed(db, event.ID)
            } else {
                // Логируем ошибку, повторная попытка будет в следующей итерации
            }
        }
        time.Sleep(5 * time.Second) // Пауза между опросами
    }
}

Дополнительные важные аспекты:

  • Идемпотентность потребителей: Сервисы, которые слушают эти события, должны быть идемпотентными, так как Relay может отправить одно и то же сообщение более одного раза (например, если он упал после отправки, но до того, как пометил событие обработанным).
  • Dead Letter Queue (DLQ): Для "отравленных" сообщений, которые не удается обработать после нескольких попыток, следует предусмотреть механизм DLQ, чтобы они не блокировали очередь.