Что происходит, если брокер сообщений (например, RabbitMQ) недоступен в момент отправки сообщения для сохранения в БД?

«Что происходит, если брокер сообщений (например, RabbitMQ) недоступен в момент отправки сообщения для сохранения в БД?» — вопрос из категории Брокеры сообщений, который задают на 25% собеседований C# Разработчик. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

В production-системах прямая синхронная запись в БД при недоступности брокера сообщений недопустима, так как это приводит к потере данных и отказу сервиса. Вместо этого применяются стратегии обеспечения надежности (reliability patterns).

Правильный подход — использование паттерна "Outbox":

  1. Сообщение и связанные с ним изменения БД сохраняются в одной транзакции в локальную таблицу Outbox.
  2. Фоновый процесс (например, BackgroundService в .NET) периодически опрашивает эту таблицу и пытается отправить сообщения в брокер.
  3. После успешной отправки в брокер запись помечается как отправленная или удаляется.

Пример реализации Outbox на C#:

// 1. Сохранение в рамках транзакции
using var transaction = await dbContext.Database.BeginTransactionAsync();
try
{
    // Сохраняем основную бизнес-сущность
    dbContext.Orders.Add(newOrder);
    await dbContext.SaveChangesAsync();

    // Сохраняем сообщение в ту же БД (таблица Outbox)
    var outboxMessage = new OutboxMessage
    {
        Id = Guid.NewGuid(),
        OccurredOn = DateTime.UtcNow,
        Type = "OrderCreated",
        Data = JsonSerializer.Serialize(new { OrderId = newOrder.Id })
    };
    dbContext.OutboxMessages.Add(outboxMessage);
    await dbContext.SaveChangesAsync();

    await transaction.CommitAsync(); // Все сохраняется атомарно
}
catch
{
    await transaction.RollbackAsync();
    throw;
}

// 2. Фоновый сервис для отправки сообщений из Outbox
public class OutboxProcessorService : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var pendingMessages = await dbContext.OutboxMessages
                .Where(m => !m.Processed)
                .Take(20)
                .ToListAsync();

            foreach (var message in pendingMessages)
            {
                try
                {
                    await messageBus.PublishAsync(message.Type, message.Data); // Отправка в RabbitMQ/Kafka
                    message.Processed = true;
                }
                catch (Exception ex)
                {
                    // Логируем ошибку, сообщение останется для повторной попытки
                    _logger.LogError(ex, "Failed to publish outbox message {MessageId}", message.Id);
                }
            }
            await dbContext.SaveChangesAsync();
            await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); // Интервал опроса
        }
    }
}

Дополнительные меры:

  • Повторные попытки (Retry) с экспоненциальной задержкой для отправки в брокер.
  • Circuit Breaker для избежания постоянных попыток при длительной недоступности брокера.
  • Мониторинг очереди непрочитанных сообщений в Outbox.