Как обеспечить дедупликацию сообщений в микросервисной архитектуре?

Ответ

В микросервисной архитектуре проблема дублирования сообщений (например, из-за повторной отправки брокером или сбоев сети) решается с помощью паттернов Idempotency Key и Deduplication.

  1. Idempotency Key (Идемпотентный ключ): Клиент (отправитель сообщения) генерирует уникальный ключ для каждой операции. Сервис-получатель сохраняет этот ключ и результат обработки. При получении сообщения с уже известным ключом, сервис возвращает ранее сохранённый результат, не выполняя операцию повторно.

    type Request struct {
        IdempotencyKey string
        Data           interface{}
    }
    
    // cache здесь представляет собой внешнее, персистентное хранилище (например, Redis или БД)
    func ProcessRequest(r Request) (Response, error) {
        if cached, exists := cache.Get(r.IdempotencyKey); exists {
            return cached.(Response), nil
        }
        // Обработка запроса (выполняется только если ключ не найден)
        result := doWork(r.Data)
        cache.Set(r.IdempotencyKey, result)
        return result, nil
    }
  2. Deduplication (Дедупликация на стороне потребителя): Сервис-потребитель сам отслеживает ID обработанных сообщений. Перед обработкой каждого нового сообщения он проверяет, был ли его ID уже обработан. Если да, сообщение игнорируется. Часто реализуется через хранилище (Redis, база данных) с установленным TTL для ID сообщений.

    // processedMessagesStore - внешнее хранилище (Redis, БД)
    func ConsumeMessage(message Message) error {
        if processedMessagesStore.Contains(message.ID) {
            log.Printf("Message %s already processed, skipping", message.ID)
            return nil
        }
    
        // Обработка сообщения
        err := processBusinessLogic(message.Data)
        if err != nil {
            return err
        }
    
        processedMessagesStore.Add(message.ID, TTL_FOR_MESSAGE_ID)
        return nil
    }

Для брокеров сообщений (Kafka, RabbitMQ и т.д.) также используют:

  • Оффсеты/подтверждения (Acknowledgements): Потребитель подтверждает брокеру успешную обработку сообщения, и брокер не отправляет его повторно (в случае успешного подтверждения).
  • Exactly-once семантика: Некоторые брокеры (например, Kafka Streams) предлагают механизмы для обеспечения обработки сообщения ровно один раз, что упрощает дедупликацию на уровне приложения.
  • Транзакционные потребители/продюсеры: Использование транзакций для атомарной записи результатов обработки и подтверждения сообщения.