Ответ
Да, работал с механизмом ручных подтверждений (Manual Acknowledgements), часто называемым BAS (Basic Acknowledge System). Это ключевой механизм для гарантированной доставки сообщений в RabbitMQ.
Принцип работы:
- Потребитель (consumer) подписывается на очередь с параметром
autoAck: false. - Брокер передает сообщение, но не удаляет его из очереди, пока не получит явное подтверждение (
basicAck). - Если соединение с потребителем разрывается без подтверждения, брокер повторно ставит сообщение в очередь (по умолчанию).
Основные методы канала (Channel):
BasicAck(deliveryTag, multiple: false): Подтверждает успешную обработку одного сообщения.BasicNack(deliveryTag, multiple: false, requeue: true): Отклоняет сообщение. Сrequeue: trueсообщение возвращается в очередь; сfalse— отправляется в Dead Letter Exchange (DLX).BasicReject(deliveryTag, requeue: true): Устаревший аналогBasicNackдля одного сообщения.
Пример надежного потребителя на C# (RabbitMQ.Client):
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare("orders", durable: true, exclusive: false, autoDelete: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// Обработка сообщения (например, сохранение в БД)
ProcessOrder(message);
// Явное подтверждение после успешной обработки
channel.BasicAck(ea.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
// В случае ошибки отклоняем сообщение с возвратом в очередь
// Важно: иметь обработку повторяющихся сбоев, чтобы избежать бесконечного цикла
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: true);
_logger.LogError(ex, "Ошибка обработки сообщения");
}
};
// Критически важный параметр autoAck: false
channel.BasicConsume(queue: "orders", autoAck: false, consumer: consumer);
Важные нюансы:
- Идемпотентность: Обработчик должен корректно обрабатывать повторно доставленные сообщения.
- Prefetch Count: Для балансировки нагрузки стоит настраивать
channel.BasicQos(0, prefetchCount, false), ограничивая количество неподтвержденных сообщений у одного потребителя. - Dead Letter Exchange: Используется для перемещения сообщений, которые не удалось обработать после нескольких попыток, в отдельную очередь для анализа.