Ответ
Гарантированная доставка в RabbitMQ достигается комбинацией подтверждений на стороне продюсера и консьюмера, а также устойчивости сообщений. Вот как я настраиваю это в продакшене.
1. Подтверждение от брокера (Publisher Confirms). Включаю этот режим, чтобы быть уверенным, что сообщение достигло брокера и записано на диск (если persistent).
import "github.com/streadway/amqp"
ch, err := conn.Channel()
err = ch.Confirm(false) // Переводим канал в режим подтверждений
// Канал для подтверждений
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
// Публикация сообщения
msg := amqp.Publishing{
DeliveryMode: amqp.Persistent, // КРИТИЧНО: Сообщение должно быть persistent
ContentType: "application/json",
Body: []byte(`{"event": "order.created"}`),
}
err = ch.Publish("", "orders.queue", true, false, msg)
// Ожидание подтверждения с таймаутом
select {
case confirm := <-confirms:
if confirm.Ack {
// Сообщение доставлено в брокер и обработано (сохранено на диск при необходимости)
log.Println("Confirmed delivery with tag:", confirm.DeliveryTag)
} else {
// Брокер не смог принять сообщение, нужна повторная отправка
log.Println("Failed to deliver message tag:", confirm.DeliveryTag)
}
case <-time.After(5 * time.Second):
// Таймаут, брокер не ответил. Нужна логика повторной попытки.
log.Println("Publish confirmation timeout")
}
2. Подтверждение обработки консьюмером (Consumer Acknowledgments). Консьюмер должен явно подтвердить успешную обработку, иначе сообщение будет переотправлено другому консьюмеру.
msgs, err := ch.Consume(
"orders.queue",
"", // consumer tag
false, // auto-ack = FALSE! Это самое важное
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
for d := range msgs {
// Обработка сообщения
log.Printf("Received a message: %s", d.Body)
// Имитация работы
err := processOrder(d.Body)
if err != nil {
// Если обработка не удалась, можно:
// 1. Отказаться от сообщения с requeue=false (отправить в DLQ)
// 2. Или отложить повторную попытку
log.Printf("Processing failed: %v", err)
ch.Nack(d.DeliveryTag, false, false) // Не возвращать в очередь
} else {
// Явно подтверждаем успешную обработку
ch.Ack(d.DeliveryTag, false)
}
}
3. Дополнительные меры для надежности:
- Dead Letter Exchange (DLX): Настраиваю для очереди, чтобы неудачные сообщения (отвергнутые с
requeue=false) попадали в отдельную очередь для анализа. - Высокая доступность (HA): Использую mirrored queues в кластере RabbitMQ, чтобы очередь реплицировалась на несколько нод.
- Логика повторных попыток (Retry) на стороне продюсера: При отсутствии подтверждения от брокера (timeout или nack) реализую exponential backoff для повторной отправки.
- Идемпотентность обработчика: Консьюмер должен быть готов к повторной доставке одного и того же сообщения (например, из-за таймаута подтверждения). Я добавляю проверку по ID сообщения в своей БД, чтобы не обрабатывать дубликаты.