Как гарантировать, что в очередь RabbitMQ не будут попадать сообщения от других микросервисов?

Ответ

В Node.js-микросервисах я использую несколько подходов для изоляции очередей RabbitMQ:

1. Эксклюзивные очереди (exclusive queues)

const amqp = require('amqplib');

async function setupPrivateQueue() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();

  // Очередь будет доступна только этому соединению
  const { queue } = await channel.assertQueue('', {
    exclusive: true, // ключевой параметр
    durable: false
  });

  console.log('Private queue created:', queue);
  return { connection, channel, queue };
}

2. Использование отдельных виртуальных хостов (VHost) и прав доступа

// Подключение к выделенному VHost
const connection = await amqp.connect('amqp://service_user:password@localhost/my_service_vhost');

// В RabbitMQ настройки через CLI:
// rabbitmqctl add_vhost my_service_vhost
// rabbitmqctl set_permissions -p my_service_vhost service_user ".*" ".*" ".*"

3. Валидация сообщений и заголовков

const { queue } = await channel.assertQueue('service.tasks', { durable: true });

channel.consume(queue, (msg) => {
  const content = JSON.parse(msg.content.toString());

  // Проверяем источник сообщения
  if (msg.properties.headers?.source !== 'my_service') {
    console.warn('Message from unauthorized source, rejecting');
    channel.nack(msg); // отклоняем сообщение
    return;
  }

  // Валидация структуры
  if (!content.taskId || !content.type) {
    console.error('Invalid message format');
    channel.nack(msg);
    return;
  }

  // Обработка сообщения
  processTask(content);
  channel.ack(msg);
});

4. Использование exchange с routing keys

// Каждый сервис публикует в свой exchange
await channel.assertExchange('service_a.exchange', 'direct', { durable: true });
await channel.assertQueue('service_a.tasks', { durable: true });
await channel.bindQueue('service_a.tasks', 'service_a.exchange', 'tasks');

// Публикация с указанием source
channel.publish('service_a.exchange', 'tasks', Buffer.from(JSON.stringify(payload)), {
  persistent: true,
  headers: { source: 'service_a', version: '1.0' }
});

На практике я комбинирую эти подходы: использую отдельные VHost для production/staging, настраиваю права доступа через RabbitMQ management plugin и всегда добавляю валидацию сообщений в consumer.

Ответ 18+ 🔞

Да ты послушай, какой у нас тут разговор про изоляцию в микросервисах! Прямо как в коммуналке, где каждый сосед пытается свой чайник в общую розетку воткнуть, а ты ему — ёпта, нет, браток, это моя очередь на RabbitMQ, иди своей тропой.

1. Эксклюзивные очереди — это как личный сейф. Смотри, делаешь очередь, которая только для твоего соединения. Как будто говоришь системе: «Эй, эта штука — моя, в рот меня чих-пых, больше никому не светит». В коде это выглядит просто, но сила — в параметре exclusive: true.

const { queue } = await channel.assertQueue('', {
  exclusive: true, // Вот этот красавец всё решает
  durable: false
});

Создаётся очередь с уникальным именем (RabbitMQ сам придумает), и как только соединение закроется — очередь накрылась медным тазом. Идеально для временных задач, чтобы мусор не копился.

2. Отдельные виртуальные хосты (VHost) — это твоя личная квартира в общем доме. Не хочешь, чтобы соседский сервис лез в твои данные? Создаёшь ему свой VHost, как комнату с отдельным замком.

// Подключаешься уже к своему углу
const connection = await amqp.connect('amqp://service_user:password@localhost/my_service_vhost');

А в консоли RabbitMQ настраиваешь права, чтобы этот пользователь мог делать что хочет, но только в своей «квартире». Доверия ебать ноль к другим сервисам, поэтому каждый сидит в своей песочнице.

3. Валидация сообщений — это как проверка документов на границе. Получаешь сообщение, а первым делом — подозрение ебать чувствую. Кто прислал? Что внутри? Если что-то не так — сразу в отказ.

channel.consume(queue, (msg) => {
  // Сразу смотрим заголовки
  if (msg.properties.headers?.source !== 'my_service') {
    console.warn('Сообщение от левого источника, отвергаем!');
    channel.nack(msg); // Иди нахуй со своим спамом
    return;
  }

  // Проверяем, что внутри
  const content = JSON.parse(msg.content.toString());
  if (!content.taskId || !content.type) {
    console.error('Формат сообщения — пиздец!');
    channel.nack(msg);
    return;
  }

  // Только теперь обрабатываем
  processTask(content);
  channel.ack(msg);
});

4. Exchange с routing keys — это как почта с указанием отдела. Каждый сервис публикует в свой собственный обменник (exchange), а не кидает всё в одну кучу. Это чтобы не было хуй с горы, когда все сообщения валятся в одну очередь и начинается бардак.

// Сервис А работает только со своим обменником
await channel.assertExchange('service_a.exchange', 'direct', { durable: true });
await channel.assertQueue('service_a.tasks', { durable: true });
await channel.bindQueue('service_a.tasks', 'service_a.exchange', 'tasks');

// И когда публикует, то явно указывает: «Я от service_a, версия 1.0, не трогать!»
channel.publish('service_a.exchange', 'tasks', Buffer.from(JSON.stringify(payload)), {
  persistent: true,
  headers: { source: 'service_a', version: '1.0' }
});

На практике я всё это комбинирую, как заправский архитектор. Отдельные VHost для прода и тестов, права доступа настроены так, что гомосеки налетели — не пройдут, и валидация на каждом шагу. Потому что если один сервис накосячит, остальные не должны ебушки-воробушки разлететься.