Как разделять очереди задач в Node.js приложении?

Ответ

В Node.js-приложениях я разделяю очереди задач по нескольким критериям, используя библиотеки вроде bull или bee-queue.

1. Разделение по типу задач

const Queue = require('bull');

// Создаем отдельные очереди для разных типов задач
const emailQueue = new Queue('email', 'redis://127.0.0.1:6379');
const imageProcessingQueue = new Queue('image-processing', 'redis://127.0.0.1:6379');
const reportGenerationQueue = new Queue('reports', 'redis://127.0.0.1:6379');

// Добавление задач
emailQueue.add('welcome-email', { userId: 123, email: 'user@example.com' });
imageProcessingQueue.add('resize', { imageId: 456, sizes: ['thumb', 'medium'] });

2. Разделение по приоритету

const priorityQueue = new Queue('tasks', {
  redis: { port: 6379, host: '127.0.0.1' },
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 1000 }
  }
});

// Задачи с разным приоритетом
priorityQueue.add('high-priority', { task: 'urgent' }, { priority: 1 }); // Высокий
priorityQueue.add('low-priority', { task: 'background' }, { priority: 100 }); // Низкий

3. Разделение по воркерам/процессам

// worker-процесс для CPU-intensive задач
const { Worker } = require('worker_threads');
const cpuIntensiveQueue = new Queue('cpu-tasks');

cpuIntensiveQueue.process(2, (job) => { // 2 параллельных воркера
  return new Promise((resolve, reject) => {
    const worker = new Worker('./image-processor.js', {
      workerData: job.data
    });

    worker.on('message', resolve);
    worker.on('error', reject);
    worker.on('exit', (code) => {
      if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`));
    });
  });
});

// I/O задачи можно обрабатывать больше воркеров
const ioQueue = new Queue('io-tasks');
ioQueue.process(10); // 10 параллельных воркеров для I/O

4. Разделение по задержке выполнения

const scheduledQueue = new Queue('scheduled');

// Задача выполнится через 1 час
scheduledQueue.add('cleanup', { type: 'temp-files' }, {
  delay: 60 * 60 * 1000, // 1 час в миллисекундах
  removeOnComplete: true
});

// Периодические задачи (cron)
scheduledQueue.add('daily-report', {}, {
  repeat: { cron: '0 9 * * *' } // Каждый день в 9:00
});

5. Разделение по обработчикам ошибок

const mainQueue = new Queue('main');
const deadLetterQueue = new Queue('dead-letters');

mainQueue.process(async (job) => {
  try {
    await processJob(job);
  } catch (error) {
    // После 3 неудачных попыток перемещаем в dead letter queue
    if (job.attemptsMade >= 3) {
      await deadLetterQueue.add('failed-job', {
        originalJob: job.data,
        error: error.message,
        failedAt: new Date()
      });
    }
    throw error;
  }
});

В production-окружении я также разделяю очереди по окружениям (dev/staging/prod) через префиксы в Redis и настраиваю мониторинг через Bull Board или аналоги.

Ответ 18+ 🔞

А, ну ты про очереди в Node.js спрашиваешь? Ёпта, тема-то жирная, овердохуища вариантов. Смотри, я тебе по полочкам разложу, как я это обычно делаю, чтобы потом не орать «какого хуя всё упало», когда одна тяжёлая задача всю систему в говно превратит.

1. По типу задач — самое простое и логичное Тут всё понятно, как божий день. Нельзя же всё в одну кучу сваливать, получится пиздопроебина. Отправка писем, обработка картинок и генерация отчётов — это как минимум три разных зверя. Вот смотри, как их по разным загонам раскидать:

const Queue = require('bull');

// Создаем отдельные очереди для разных типов задач
const emailQueue = new Queue('email', 'redis://127.0.0.1:6379');
const imageProcessingQueue = new Queue('image-processing', 'redis://127.0.0.1:6379');
const reportGenerationQueue = new Queue('reports', 'redis://127.0.0.1:6379');

// Добавление задач
emailQueue.add('welcome-email', { userId: 123, email: 'user@example.com' });
imageProcessingQueue.add('resize', { imageId: 456, sizes: ['thumb', 'medium'] });

Зачем? Да чтобы если генерация отчёта, эта мартышлюшка, на час зависнет, то письма приветственные всё равно улетят, а юзеры не будут орать «где моё письмо, пидарас шерстяной».

2. По приоритету — когда нужно решить, кто важнее Бывает же, одна задача — срочная, «всё горит, ядрёна вошь», а другая может и подождать. Вот для этого в bull есть приоритеты. Чем число меньше — тем важнее задача.

const priorityQueue = new Queue('tasks', {
  redis: { port: 6379, host: '127.0.0.1' },
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 1000 }
  }
});

// Задачи с разным приоритетом
priorityQueue.add('high-priority', { task: 'urgent' }, { priority: 1 }); // Высокий
priorityQueue.add('low-priority', { task: 'background' }, { priority: 100 }); // Низкий

Представь, ты добавляешь задачу «отправить смс с кодом подтверждения» и задачу «почистить логи за прошлый год». Первую, ясное дело, ставишь в приоритет, чтобы юзер не ждал, а вторая — пусть себе ползёт, когда ресурсы свободны.

3. По воркерам — тут уже хитрая жопа начинается Вот это ключевой момент, чувак. Задачи, которые процессор жрут как не в себя (типа обработки изображений или видео), и задачи, которые просто ждут ответа от базы или API, — обрабатывать надо по-разному. Для CPU-bound задач много параллельных воркеров не наделаешь — сервер сдохнет. А для I/O — можно хоть двадцать штук запустить, они же большую часть времени спят.

// worker-процесс для CPU-intensive задач
const { Worker } = require('worker_threads');
const cpuIntensiveQueue = new Queue('cpu-tasks');

cpuIntensiveQueue.process(2, (job) => { // 2 параллельных воркера
  return new Promise((resolve, reject) => {
    const worker = new Worker('./image-processor.js', {
      workerData: job.data
    });

    worker.on('message', resolve);
    worker.on('error', reject);
    worker.on('exit', (code) => {
      if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`));
    });
  });
});

// I/O задачи можно обрабатывать больше воркеров
const ioQueue = new Queue('io-tasks');
ioQueue.process(10); // 10 параллельных воркеров для I/O

Видишь разницу? Для тяжёлых вычислений — всего два воркера, чтобы не угробить ядра. А для задач, связанных с диском или сетью, — десять, потому что они не грузят CPU. Иначе получится, что один хуй с винтом считает, а остальные девять простаивают.

4. По времени выполнения — отложенные и периодические Иногда нужно не сейчас, а «через час» или «каждый день в пять утра». Для этого в bull есть delay и repeat.

const scheduledQueue = new Queue('scheduled');

// Задача выполнится через 1 час
scheduledQueue.add('cleanup', { type: 'temp-files' }, {
  delay: 60 * 60 * 1000, // 1 час в миллисекундах
  removeOnComplete: true
});

// Периодические задачи (cron)
scheduledQueue.add('daily-report', {}, {
  repeat: { cron: '0 9 * * *' } // Каждый день в 9:00
});

Удобно, ёпта. Поставил задачу на ночную чистку мусора — и спи спокойно. Главное, cron-выражение правильно написать, а то вместо ночи в обед запустится, и будет тебе хиросима.

5. По обработке ошибок — чтобы не потерять неудачников А это, бля, святое. Любая задача может накрыться медным тазом. После нескольких неудачных попыток её нужно не просто выкинуть, а отправить в специальную «мёртвую очередь» (dead letter queue) на разбор полётов. Потом посмотришь, что там пошло не так.

const mainQueue = new Queue('main');
const deadLetterQueue = new Queue('dead-letters');

mainQueue.process(async (job) => {
  try {
    await processJob(job);
  } catch (error) {
    // После 3 неудачных попыток перемещаем в dead letter queue
    if (job.attemptsMade >= 3) {
      await deadLetterQueue.add('failed-job', {
        originalJob: job.data,
        error: error.message,
        failedAt: new Date()
      });
    }
    throw error;
  }
});

Вот так. Задача три раза побыла сосалкой, не получилось — отправляй её в архив неудачников и ищи причину. А то без этого как слепой щенок — нихуя не понятно, почему что-то не работает.

Ну и в продакшене, само собой, не забудь очереди для разных окружений через префиксы в Redis разделить, чтобы dev-задачи в prod не полезли. И мониторинг какой-нибудь, типа Bull Board, поставь — чтобы видеть, что где висит и не пригорает ли. А то без этого — чих-пых тебя в сраку, и прощай, спокойная жизнь.