Что происходит, если брокер сообщений (например, RabbitMQ) недоступен в момент отправки сообщения для сохранения в БД?

Ответ

В production-системах прямая синхронная запись в БД при недоступности брокера сообщений недопустима, так как это приводит к потере данных и отказу сервиса. Вместо этого применяются стратегии обеспечения надежности (reliability patterns).

Правильный подход — использование паттерна "Outbox":

  1. Сообщение и связанные с ним изменения БД сохраняются в одной транзакции в локальную таблицу Outbox.
  2. Фоновый процесс (например, BackgroundService в .NET) периодически опрашивает эту таблицу и пытается отправить сообщения в брокер.
  3. После успешной отправки в брокер запись помечается как отправленная или удаляется.

Пример реализации Outbox на C#:

// 1. Сохранение в рамках транзакции
using var transaction = await dbContext.Database.BeginTransactionAsync();
try
{
    // Сохраняем основную бизнес-сущность
    dbContext.Orders.Add(newOrder);
    await dbContext.SaveChangesAsync();

    // Сохраняем сообщение в ту же БД (таблица Outbox)
    var outboxMessage = new OutboxMessage
    {
        Id = Guid.NewGuid(),
        OccurredOn = DateTime.UtcNow,
        Type = "OrderCreated",
        Data = JsonSerializer.Serialize(new { OrderId = newOrder.Id })
    };
    dbContext.OutboxMessages.Add(outboxMessage);
    await dbContext.SaveChangesAsync();

    await transaction.CommitAsync(); // Все сохраняется атомарно
}
catch
{
    await transaction.RollbackAsync();
    throw;
}

// 2. Фоновый сервис для отправки сообщений из Outbox
public class OutboxProcessorService : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var pendingMessages = await dbContext.OutboxMessages
                .Where(m => !m.Processed)
                .Take(20)
                .ToListAsync();

            foreach (var message in pendingMessages)
            {
                try
                {
                    await messageBus.PublishAsync(message.Type, message.Data); // Отправка в RabbitMQ/Kafka
                    message.Processed = true;
                }
                catch (Exception ex)
                {
                    // Логируем ошибку, сообщение останется для повторной попытки
                    _logger.LogError(ex, "Failed to publish outbox message {MessageId}", message.Id);
                }
            }
            await dbContext.SaveChangesAsync();
            await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); // Интервал опроса
        }
    }
}

Дополнительные меры:

  • Повторные попытки (Retry) с экспоненциальной задержкой для отправки в брокер.
  • Circuit Breaker для избежания постоянных попыток при длительной недоступности брокера.
  • Мониторинг очереди непрочитанных сообщений в Outbox.

Ответ 18+ 🔞

А, слушай, вот это реальная проблема, с которой народ постоянно наступает на одни и те же грабли! Прям как об стенку головой — бьются, бьются, а потом удивляются, почему в продакшене всё ебнулось.

Представь ситуацию: твой сервис пытается сохранить заказ в базу И ОДНОВРЕМЕННО отправить событие в RabbitMQ или Kafka. А брокер — он, сука, лежит. И что происходит? Вся транзакция откатывается, заказ не создаётся, пользователь получает ошибку, хотя с базой-то всё было в порядке! Это пиздец, а не архитектура. Данные теряются, сервис не работает. Полный ахтунг.

Так делать — это как пытаться завести машину, вытащив аккумулятор. Ни хуя не выйдет.

Правильный путь — паттерн "Outbox". Суть проще пареной репы, но гениальная.

Вместо того чтобы пытаться отправить сообщение куда-то наружу прямо в транзакции, мы делаем так:

  1. Всё в одну кучу. Сохраняем и сам заказ в Orders, и сообщение о его создании в локальную таблицу OutboxMessages. Всё в рамках ОДНОЙ и той же транзакции в нашей родной базе. Либо всё сохранилось, либо ничего. Никакой потери консистентности.
  2. Отправляет отдельный паровоз. Запускаем фоновую службу (типа BackgroundService), которая тупо, как танк, раз в N секунд смотрит в эту таблицу OutboxMessages, берёт неотправленные сообщения и пытается их затолкать в настоящий брокер.
  3. Пометил и забыл. Если отправил успешно — помечаем запись в аутбоксе как отправленную или вообще удаляем. Если брокер снова недоступен — ну и хуй с ним, сообщение так и лежит, ждёт следующего захода фонового работяги. Сервис продолжает работать, пользователи создают заказы, система не ломается.

Вот смотри, как это выглядит в коде, примерно:

// 1. Это делаем в обработчике запроса. Всё атомарно.
using var transaction = await dbContext.Database.BeginTransactionAsync();
try
{
    // Сохраняем сущность бизнес-логики
    dbContext.Orders.Add(newOrder);
    await dbContext.SaveChangesAsync();

    // И тут же, в ЭТОЙ ЖЕ транзакции, пишем сообщение в свою же таблицу Outbox
    var outboxMessage = new OutboxMessage
    {
        Id = Guid.NewGuid(),
        OccurredOn = DateTime.UtcNow,
        Type = "OrderCreated",
        Data = JsonSerializer.Serialize(new { OrderId = newOrder.Id })
    };
    dbContext.OutboxMessages.Add(outboxMessage);
    await dbContext.SaveChangesAsync();

    await transaction.CommitAsync(); // Фиксируем ВСЁ разом
}
catch
{
    await transaction.RollbackAsync();
    throw;
}

// 2. А это уже наш фоновый трудяга, который работает отдельно
public class OutboxProcessorService : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Смотрим, что там у нас накопилось неотправленного
            var pendingMessages = await dbContext.OutboxMessages
                .Where(m => !m.Processed)
                .Take(20)
                .ToListAsync();

            foreach (var message in pendingMessages)
            {
                try
                {
                    // Пытаемся отправить в настоящий RabbitMQ/Kafka
                    await messageBus.PublishAsync(message.Type, message.Data);
                    message.Processed = true; // Ура, отправили!
                }
                catch (Exception ex)
                {
                    // Не получилось? Ну и ладно. Запишем в лог, попробуем в следующий раз.
                    _logger.LogError(ex, "Failed to publish outbox message {MessageId}", message.Id);
                }
            }
            await dbContext.SaveChangesAsync();
            await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); // Поспим немного и снова проверим
        }
    }
}

И конечно, без дополнительных плюшек никуда:

  • Retry с умом. Когда фоновый сервис пытается отправить, нужно делать повторные попытки не тупо подряд, а с экспоненциальной задержкой. Чтобы не заспамить логами и дать брокеру время опомниться.
  • Circuit Breaker. Если брокер умер окончательно и надолго, не надо долбить его каждые 5 секунд. Включил "предохранитель", поспал подольше, потом попробовал снова. Экономия ресурсов и нервов.
  • Мониторинг. Обязательно смотри на график, сколько непрочитанных сообщений висит в таблице Outbox. Если их число растёт как сумасшедшее — это прямой сигнал, что с брокером беда. Беги смотреть.

Вот так, без лишней суеты и матерных срывов в продакшене, система становится устойчивой. Как танк, блядь.