Какой архитектурный паттерн обеспечивает атомарную запись в базу данных и отправку сообщения в очередь?

Ответ

Для решения этой задачи используется паттерн Transactional Outbox. Его суть — избежать несогласованного состояния, когда операция в базе данных прошла успешно, а отправка сообщения в очередь — нет (или наоборот).

Принцип работы

  1. Единая транзакция: В рамках одной транзакции с основной бизнес-логикой (например, создание заказа) запись о событии помещается в специальную таблицу в той же базе данных — outbox.
  2. Асинхронный отправитель: Отдельный независимый процесс (реле, poller) периодически опрашивает таблицу outbox на наличие необработанных записей.
  3. Доставка и подтверждение: Процесс-отправитель читает событие из таблицы, отправляет его в брокер сообщений (Kafka, RabbitMQ) и только после успешного подтверждения от брокера помечает запись в outbox как обработанную (или удаляет ее).

Пример реализации (псевдокод на Python)

# Шаг 1: Основная бизнес-логика
def create_order(order_data):
    # Начало транзакции
    with db.atomic_transaction():
        # 1. Сохраняем заказ в основной таблице
        new_order = Order.create(**order_data)

        # 2. Создаем запись о событии в таблице outbox
        OutboxMessage.create(
            topic='orders.created',
            payload={'order_id': new_order.id, 'customer_id': new_order.customer_id}
        )
    # Конец транзакции. Либо обе записи сохранятся, либо ни одна.

# Шаг 2: Отдельный процесс-отправитель
def process_outbox_messages():
    while True:
        messages = OutboxMessage.get_unsent(limit=100)
        for msg in messages:
            try:
                # 3. Отправляем сообщение в брокер
                message_broker.publish(topic=msg.topic, body=msg.payload)

                # 4. Помечаем как отправленное
                msg.mark_as_sent()
            except Exception as e:
                # Логируем ошибку, сообщение будет обработано в следующий раз
                log_error(f"Failed to send message {msg.id}: {e}")
        time.sleep(5) # Пауза между опросами

Преимущества

  • Надежность: Гарантирует, что ни одно событие не будет потеряно при сбое одной из систем.
  • Разделение ответственностей: Бизнес-логика не зависит от доступности брокера сообщений.
  • Согласованность данных: Обеспечивает целостность данных между локальной БД и внешними системами.

Ответ 18+ 🔞

Смотри, сейчас я тебе такую штуку объясню, что у тебя волосы завьются, а потом выпрямятся от осознания. Речь про Transactional Outbox. Это, блядь, как страховка от пиздеца, когда у тебя в базе всё окей, а сообщение в очередь улетело в сраку, или наоборот.

Как эта махина работает, ёпта

  1. Всё в одной куче: Пока ты делаешь своё дело (типа создаёшь заказ), ты в той же самой транзакции пишешь записку о событии в специальную табличку outbox — прямо в своей же базе. Либо всё сохранится, либо нихуя. Никаких промежуточных вариантов.
  2. Отдельный загонщик: Потом, отдельно, какой-нибудь фоновый демон или воркер, как маньяк, долбится в эту табличку и спрашивает: «Чё по новым?».
  3. Работа на вынос: Он хватает событие, швыряет его в Кафку или Раббита, и только когда тот говорит «О, клёво, принял!», загонщик ставит галочку «отправлено». Всё, чисто.

Смотри, как это выглядит в коде (условно)

# Шаг 1: Делаем бизнес и сразу же пишем записку
def create_order(order_data):
    # Стартуем транзакцию — всё или ничего
    with db.atomic_transaction():
        # 1. Пихаем заказ в основную таблицу
        new_order = Order.create(**order_data)

        # 2. АХТУНГ! Сразу же лепим запись в outbox. Всё в одной транзакции!
        OutboxMessage.create(
            topic='orders.created',
            payload={'order_id': new_order.id, 'customer_id': new_order.customer_id}
        )
    # Если тут вылетело, то заказа нет И записки в outbox нет. Красота.

# Шаг 2: Запускаем нашего почтальона Печкина
def process_outbox_messages():
    while True:
        # Он такой: "А не появилось ли тут писем неотправленных, а?"
        messages = OutboxMessage.get_unsent(limit=100)
        for msg in messages:
            try:
                # 3. Швыряем сообщение в брокер
                message_broker.publish(topic=msg.topic, body=msg.payload)

                # 4. Только после успеха ставим галочку "улетело"
                msg.mark_as_sent()
            except Exception as e:
                # Если брокер сдох — ну хуй с ним, попробуем в следующий раз
                log_error(f"Не отправилось сообщение {msg.id}: {e}")
        time.sleep(5) # Поспим чутка, чтоб не доебаться до базы

И в чём, сука, профит?

  • Ничего не потеряется: Даже если твой брокер сообщений накрылся медным тазом в самый ответственный момент — запись о событии лежит в базе и ждёт своего часа. Никаких «ой, а мы заказ создали, но никому не сказали».
  • Живём отдельно: Твоему основному сервису похуй, живёт Кафка или нет. Он свою работу сделал — записал в базу и пошёл дальше. А рассылкой пусть отдельный процесс страдает.
  • Порядок в данных: Гарантирует, что если заказ в базе есть, то и событие о нём рано или поздно выстрелит. Никакой ебли с eventual consistency там, где она не нужна. Всё чётко.

Вот такая, блядь, хитрая жопа. Простая, как три копейки, но от хуёвых ситуаций спасает на раз.