Ответ
В production-системах прямая синхронная запись в БД при недоступности брокера сообщений недопустима, так как это приводит к потере данных и отказу сервиса. Вместо этого применяются стратегии обеспечения надежности (reliability patterns).
Правильный подход — использование паттерна "Outbox":
- Сообщение и связанные с ним изменения БД сохраняются в одной транзакции в локальную таблицу
Outbox. - Фоновый процесс (например,
BackgroundServiceв .NET) периодически опрашивает эту таблицу и пытается отправить сообщения в брокер. - После успешной отправки в брокер запись помечается как отправленная или удаляется.
Пример реализации Outbox на C#:
// 1. Сохранение в рамках транзакции
using var transaction = await dbContext.Database.BeginTransactionAsync();
try
{
// Сохраняем основную бизнес-сущность
dbContext.Orders.Add(newOrder);
await dbContext.SaveChangesAsync();
// Сохраняем сообщение в ту же БД (таблица Outbox)
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
OccurredOn = DateTime.UtcNow,
Type = "OrderCreated",
Data = JsonSerializer.Serialize(new { OrderId = newOrder.Id })
};
dbContext.OutboxMessages.Add(outboxMessage);
await dbContext.SaveChangesAsync();
await transaction.CommitAsync(); // Все сохраняется атомарно
}
catch
{
await transaction.RollbackAsync();
throw;
}
// 2. Фоновый сервис для отправки сообщений из Outbox
public class OutboxProcessorService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var pendingMessages = await dbContext.OutboxMessages
.Where(m => !m.Processed)
.Take(20)
.ToListAsync();
foreach (var message in pendingMessages)
{
try
{
await messageBus.PublishAsync(message.Type, message.Data); // Отправка в RabbitMQ/Kafka
message.Processed = true;
}
catch (Exception ex)
{
// Логируем ошибку, сообщение останется для повторной попытки
_logger.LogError(ex, "Failed to publish outbox message {MessageId}", message.Id);
}
}
await dbContext.SaveChangesAsync();
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); // Интервал опроса
}
}
}
Дополнительные меры:
- Повторные попытки (Retry) с экспоненциальной задержкой для отправки в брокер.
- Circuit Breaker для избежания постоянных попыток при длительной недоступности брокера.
- Мониторинг очереди непрочитанных сообщений в Outbox.