Ответ
Для эффективного распараллеливания CPU-интенсивных задач или обработки больших объемов данных из очереди в Node.js я использую комбинацию библиотеки bull (или agenda) для управления очередями и Worker Threads для выгрузки тяжелых вычислений из основного потока event loop.
Архитектурное решение на примере обработки изображений:
- Установка и настройка очереди (Bull с Redis):
// queue.js const Queue = require('bull'); const imageProcessingQueue = new Queue('image-processing', { redis: { port: 6379, host: 'redis' }, defaultJobOptions: { attempts: 3, backoff: 5000 } // Повторные попытки });
module.exports = imageProcessingQueue;
2. **Добавление задач в очередь (например, из Express-роута):**
```javascript
app.post('/upload', async (req, res) => {
const imageUrls = req.body.urls; // Массив URL изображений
const jobs = imageUrls.map(url => ({
data: { imageUrl: url, options: req.body.options },
}));
await imageProcessingQueue.addBulk(jobs); // Пакетное добавление
res.json({ message: `${jobs.length} задач добавлено в очередь` });
});
- Создание обработчика с Worker Threads для CPU-интенсивных операций:
// worker-processor.js const { Worker, isMainThread, parentPort } = require('worker_threads');
if (!isMainThread) { // Код, выполняемый в воркере const { processImage } = require('./heavy-image-lib'); parentPort.on('message', async (jobData) => { try { const result = await processImage(jobData.imageUrl, jobData.options); parentPort.postMessage({ success: true, result }); } catch (error) { parentPort.postMessage({ success: false, error: error.message }); } }); }
module.exports = (jobData) => { return new Promise((resolve, reject) => { const worker = new Worker(__filename); worker.on('message', resolve); worker.on('error', reject); worker.postMessage(jobData); }); };
4. **Запуск параллельных обработчиков очереди:**
```javascript
// worker.js
const imageProcessingQueue = require('./queue');
const processWithWorker = require('./worker-processor');
// Запускаем 4 параллельных воркера (по числу ядер CPU)
imageProcessingQueue.process(4, async (job) => {
console.log(`Обработка задачи ${job.id}`);
// Делегируем тяжелую работу в отдельный поток
return await processWithWorker(job.data);
});
Ключевые преимущества такого подхода:
- Устойчивость: Задачи сохраняются в Redis и не теряются при перезапуске воркеров.
- Контроль нагрузки: Легко регулировать количество параллельных обработчиков (
queue.process(N)). - Мониторинг: Bull предоставляет UI или API для отслеживания выполнения задач.
- Защита Event Loop: CPU-интенсивный код выполняется в Worker Threads, не блокируя основной поток.
Ответ 18+ 🔞
Давай разберём эту задачку, как будто тебе нужно объяснить её соседу по гаражу, пока вы меняете масло. Ты знаешь, Node.js — он как тот спортивный мотор, который любит быстрые, резкие заезды, но если его нагрузить перевозкой кирпичей, он начнёт хрипеть и захлёбываться. Так вот, когда тебе нужно копать траншею, а не гонять по треку, нужен другой подход.
Представь, что у тебя есть куча фоток, которые надо обработать — наложить фильтры, сжать, чёрт знает что ещё. Если делать это прямо в основном потоке Node, это как пытаться разгрузить фуру в одиночку. Сервер просто встанет колом, и все пользователи будут материться, пока ждут ответа.
Вот как это делают умные люди, чтобы не прослыть пидарасами шерстяными.
Первое — ставим нормальную очередь. Берём библиотеку bull. Это не просто очередь, это целый диспетчер с талонами, как в советской поликлинике. И хранится всё это добро в Redis — надёжно, чтобы если что-то упало, задачи не потерялись, как носки в стиралке. Смотри, как это просто:
// queue.js
const Queue = require('bull');
const imageProcessingQueue = new Queue('image-processing', {
redis: { port: 6379, host: 'redis' },
defaultJobOptions: { attempts: 3, backoff: 5000 } // Если облажался — попробуй ещё три раза, но с перерывом.
});
module.exports = imageProcessingQueue;
Второе — кидаем задачи в эту очередь. Допустим, пришёл запрос с кучей ссылок на картинки. Вместо того чтобы впендюривать их обработку прямо тут, мы аккуратно складываем заявки в нашу систему.
app.post('/upload', async (req, res) => {
const imageUrls = req.body.urls;
const jobs = imageUrls.map(url => ({
data: { imageUrl: url, options: req.body.options },
}));
await imageProcessingQueue.addBulk(jobs); // Бахнул всё разом, пачкой.
res.json({ message: `${jobs.length} задач добавлено в очередь` });
});
Клиент сразу получает ответ: «Всё ок, работаем». А что там дальше — его уже не волнует. Да похуй, грубо говоря.
А вот теперь самое важное — как эту работу делать, чтобы не угробить сервер. Основной поток Node.js — это святое, его бздеть нельзя. Поэтому для тяжёлых вычислений, типа обработки изображений, используем Worker Threads. Это как нанять бригаду грузчиков (отдельные потоки), чтобы они таскали кирпичи, пока ты (основной поток) продолжаешь принимать заказы.
Создаём отдельного «работягу»:
// worker-processor.js
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (!isMainThread) {
// Этот код выполняется уже в отдельном потоке, в тихой комнатке.
const { processImage } = require('./heavy-image-lib'); // Тяжёлая библиотека
parentPort.on('message', async (jobData) => {
try {
const result = await processImage(jobData.imageUrl, jobData.options);
parentPort.postMessage({ success: true, result });
} catch (error) {
parentPort.postMessage({ success: false, error: error.message });
}
});
}
// А это функция, которую будем вызывать из основного кода, чтобы запустить работягу.
module.exports = (jobData) => {
return new Promise((resolve, reject) => {
const worker = new Worker(__filename);
worker.on('message', resolve);
worker.on('error', reject);
worker.postMessage(jobData); // Дал задание и ждёшь результат.
});
};
И, наконец, запускаем наших обслужников. Создаём отдельный процесс-воркер, который будет слушать очередь и, как только появилась задача, делегировать её в отдельный поток.
// worker.js
const imageProcessingQueue = require('./queue');
const processWithWorker = require('./worker-processor');
// Вот магия! Запускаем, например, 4 параллельных обработчика.
// Сколько ядер у процессора — столько и можно. Больше — только навредишь.
imageProcessingQueue.process(4, async (job) => {
console.log(`Обработка задачи ${job.id}`);
// Не делаем работу сами, а отдаём её нашему работяге в отдельный поток.
return await processWithWorker(job.data);
});
Что мы получили в итоге, ёпта?
- Устойчивость, как у танка. Задачи в Redis. Упал воркер — поднялся, взял следующую задачу. Доверия ебать ноль к стабильности, но система держит удар.
- Контроль над бардаком. Хочешь — запусти два воркера, хочешь — десять. Регулируешь нагрузку на процессор.
- Event Loop жив и пьян. Основной поток не блокируется, он продолжает отвечать на запросы быстрее, чем кот сука собака несётся к миске.
- Всё под контролем. У
bullесть даже веб-морда, чтобы смотреть, что там в очереди творится, сколько задач висит, какие с ошибками.
Вот и вся мудя. Не пытайся впихнуть невпихуемое в один поток. Поставь очередь, вынеси тяжёлое в боковые потоки, и спи спокойно. А то ведь можно и накрыться медным тазом под нагрузкой.