Как обеспечить гарантию доставки сообщения в RabbitMQ

«Как обеспечить гарантию доставки сообщения в RabbitMQ» — вопрос из категории Архитектура, который задают на 24% собеседований PHP Разработчик. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Гарантированная доставка в RabbitMQ достигается комбинацией подтверждений на стороне продюсера и консьюмера, а также устойчивости сообщений. Вот как я настраиваю это в продакшене.

1. Подтверждение от брокера (Publisher Confirms). Включаю этот режим, чтобы быть уверенным, что сообщение достигло брокера и записано на диск (если persistent).

import "github.com/streadway/amqp"

ch, err := conn.Channel()
err = ch.Confirm(false) // Переводим канал в режим подтверждений

// Канал для подтверждений
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))

// Публикация сообщения
msg := amqp.Publishing{
    DeliveryMode: amqp.Persistent, // КРИТИЧНО: Сообщение должно быть persistent
    ContentType:  "application/json",
    Body:         []byte(`{"event": "order.created"}`),
}
err = ch.Publish("", "orders.queue", true, false, msg)

// Ожидание подтверждения с таймаутом
select {
case confirm := <-confirms:
    if confirm.Ack {
        // Сообщение доставлено в брокер и обработано (сохранено на диск при необходимости)
        log.Println("Confirmed delivery with tag:", confirm.DeliveryTag)
    } else {
        // Брокер не смог принять сообщение, нужна повторная отправка
        log.Println("Failed to deliver message tag:", confirm.DeliveryTag)
    }
case <-time.After(5 * time.Second):
    // Таймаут, брокер не ответил. Нужна логика повторной попытки.
    log.Println("Publish confirmation timeout")
}

2. Подтверждение обработки консьюмером (Consumer Acknowledgments). Консьюмер должен явно подтвердить успешную обработку, иначе сообщение будет переотправлено другому консьюмеру.

msgs, err := ch.Consume(
    "orders.queue",
    "",    // consumer tag
    false, // auto-ack = FALSE! Это самое важное
    false, // exclusive
    false, // no-local
    false, // no-wait
    nil,   // args
)

for d := range msgs {
    // Обработка сообщения
    log.Printf("Received a message: %s", d.Body)

    // Имитация работы
    err := processOrder(d.Body)

    if err != nil {
        // Если обработка не удалась, можно:
        // 1. Отказаться от сообщения с requeue=false (отправить в DLQ)
        // 2. Или отложить повторную попытку
        log.Printf("Processing failed: %v", err)
        ch.Nack(d.DeliveryTag, false, false) // Не возвращать в очередь
    } else {
        // Явно подтверждаем успешную обработку
        ch.Ack(d.DeliveryTag, false)
    }
}

3. Дополнительные меры для надежности:

  • Dead Letter Exchange (DLX): Настраиваю для очереди, чтобы неудачные сообщения (отвергнутые с requeue=false) попадали в отдельную очередь для анализа.
  • Высокая доступность (HA): Использую mirrored queues в кластере RabbitMQ, чтобы очередь реплицировалась на несколько нод.
  • Логика повторных попыток (Retry) на стороне продюсера: При отсутствии подтверждения от брокера (timeout или nack) реализую exponential backoff для повторной отправки.
  • Идемпотентность обработчика: Консьюмер должен быть готов к повторной доставке одного и того же сообщения (например, из-за таймаута подтверждения). Я добавляю проверку по ID сообщения в своей БД, чтобы не обрабатывать дубликаты.