Ответ
Для выноса CPU-intensive задач из Event Loop в Node.js я использую несколько стратегий в зависимости от конкретного сценария:
1. Worker Threads для параллельных вычислений:
// main.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const os = require('os');
if (isMainThread) {
// Создаём пул воркеров
class ComputePool {
constructor(poolSize = os.cpus().length) {
this.workers = [];
this.taskQueue = [];
this.activeWorkers = 0;
for (let i = 0; i < poolSize; i++) {
const worker = new Worker(__filename);
worker.on('message', (result) => {
this.activeWorkers--;
const nextTask = this.taskQueue.shift();
if (nextTask) {
this.executeTask(nextTask.task, nextTask.resolve, nextTask.reject, worker);
} else {
this.workers.push(worker);
}
});
worker.on('error', (error) => {
console.error('Worker error:', error);
});
this.workers.push(worker);
}
}
executeTask(task, resolve, reject, worker = null) {
if (!worker) {
worker = this.workers.pop();
}
if (worker) {
this.activeWorkers++;
worker.postMessage(task);
worker.once('message', resolve);
worker.once('error', reject);
} else {
this.taskQueue.push({ task, resolve, reject });
}
}
async processImage(imageData, transformations) {
return new Promise((resolve, reject) => {
this.executeTask(
{ type: 'imageProcessing', imageData, transformations },
resolve,
reject
);
});
}
async computeFibonacci(n) {
return new Promise((resolve, reject) => {
this.executeTask(
{ type: 'fibonacci', n },
resolve,
reject
);
});
}
}
// Использование
const pool = new ComputePool(4);
async function processBatch() {
const results = await Promise.all([
pool.computeFibonacci(40),
pool.computeFibonacci(41),
pool.computeFibonacci(42),
pool.computeFibonacci(43)
]);
console.log('Results:', results);
}
processBatch();
} else {
// Код воркера
parentPort.on('message', (task) => {
let result;
switch (task.type) {
case 'fibonacci':
result = fibonacci(task.n);
break;
case 'imageProcessing':
result = processImage(task.imageData, task.transformations);
break;
}
parentPort.postMessage(result);
});
function fibonacci(n) {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
function processImage(imageData, transformations) {
// Интенсивная обработка изображения
return { processed: true, size: imageData.length };
}
}
2. Разбиение задач на части с setImmediate:
function processLargeDatasetInChunks(dataset, chunkSize = 1000, processItem) {
let index = 0;
let results = [];
return new Promise((resolve) => {
function processChunk() {
const startTime = Date.now();
// Обрабатываем chunkSize элементов или пока не превысим лимит времени
while (index < dataset.length && (Date.now() - startTime) < 10) {
results.push(processItem(dataset[index]));
index++;
}
if (index < dataset.length) {
// Отдаём контроль обратно в Event Loop
setImmediate(processChunk);
} else {
resolve(results);
}
}
processChunk();
});
}
// Использование
const largeArray = Array.from({ length: 1000000 }, (_, i) => i);
processLargeDatasetInChunks(largeArray, 5000, (item) => {
// Сложные вычисления над каждым элементом
return Math.sqrt(item) * Math.sin(item);
}).then(results => {
console.log(`Processed ${results.length} items`);
});
3. Child Processes для изоляции:
// compute-service.js
const { fork } = require('child_process');
const path = require('path');
class ComputeService {
constructor() {
this.workers = new Map();
}
async runHeavyComputation(data, computationType) {
const worker = fork(path.join(__dirname, 'computation-worker.js'));
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
worker.kill();
reject(new Error('Computation timeout'));
}, 30000);
worker.on('message', (result) => {
clearTimeout(timeout);
worker.kill();
resolve(result);
});
worker.on('error', reject);
worker.send({ data, computationType });
});
}
}
// computation-worker.js
process.on('message', ({ data, computationType }) => {
let result;
switch (computationType) {
case 'matrixMultiplication':
result = multiplyMatrices(data.matrixA, data.matrixB);
break;
case 'dataAnalysis':
result = analyzeDataset(data.dataset);
break;
}
process.send({ result });
});
function multiplyMatrices(a, b) {
// Интенсивное умножение матриц
return { /* результат */ };
}
4. Практические рекомендации:
- Выбор подхода: Worker Threads для общих данных, Child Processes для полной изоляции
- Пул воркеров: Всегда использую пул для избежания накладных расходов на создание
- Обработка ошибок: Реализую механизмы повторных попыток и graceful degradation
- Мониторинг: Отслеживаю загрузку CPU и память воркеров
- Кэширование: Кэширую результаты тяжёлых вычислений, когда это возможно
Ответ 18+ 🔞
Слушай, я тут подумал — ну реально, какого хуя Node.js такой странный? С одной стороны, асинхронный, быстрый, а с другой — один поток на всё про всё, и если туда CPU-bound задачу сунешь, то всё, приехали, сервер встаёт колом, как будто его впендюрили по самое не балуй. Так что давай разберём, как от этого косяка уворачиваться, чтобы не было потом волнения ебать.
1. Worker Threads — это типа отдельные пацаны, которые за тебя пашут
Представь: у тебя есть один чувак (Event Loop), который пытается и пиво пить, и на гитаре играть, и ещё с девушкой болтать одновременно. Естественно, он нихуя не успевает. Так вот Worker Threads — это когда ты зовёшь ещё трёх таких же чуваков, и они за тебя гитару дербанить. Смотри, как это выглядит:
// main.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const os = require('os');
if (isMainThread) {
// Делаем пул воркеров — чтоб не создавать их каждый раз, а то накладные расходы будут овердохуища
class ComputePool {
constructor(poolSize = os.cpus().length) {
this.workers = [];
this.taskQueue = [];
this.activeWorkers = 0;
for (let i = 0; i < poolSize; i++) {
const worker = new Worker(__filename);
worker.on('message', (result) => {
this.activeWorkers--;
const nextTask = this.taskQueue.shift();
if (nextTask) {
this.executeTask(nextTask.task, nextTask.resolve, nextTask.reject, worker);
} else {
this.workers.push(worker);
}
});
worker.on('error', (error) => {
console.error('Worker error:', error);
});
this.workers.push(worker);
}
}
executeTask(task, resolve, reject, worker = null) {
if (!worker) {
worker = this.workers.pop();
}
if (worker) {
this.activeWorkers++;
worker.postMessage(task);
worker.once('message', resolve);
worker.once('error', reject);
} else {
this.taskQueue.push({ task, resolve, reject });
}
}
async processImage(imageData, transformations) {
return new Promise((resolve, reject) => {
this.executeTask(
{ type: 'imageProcessing', imageData, transformations },
resolve,
reject
);
});
}
async computeFibonacci(n) {
return new Promise((resolve, reject) => {
this.executeTask(
{ type: 'fibonacci', n },
resolve,
reject
);
});
}
}
// Использование
const pool = new ComputePool(4);
async function processBatch() {
const results = await Promise.all([
pool.computeFibonacci(40),
pool.computeFibonacci(41),
pool.computeFibonacci(42),
pool.computeFibonacci(43)
]);
console.log('Results:', results);
}
processBatch();
} else {
// А это код, который выполняется внутри воркера — типа отдельная вселенная
parentPort.on('message', (task) => {
let result;
switch (task.type) {
case 'fibonacci':
result = fibonacci(task.n);
break;
case 'imageProcessing':
result = processImage(task.imageData, task.transformations);
break;
}
parentPort.postMessage(result);
});
function fibonacci(n) {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
function processImage(imageData, transformations) {
// Тут какая-нибудь тяжёлая хуйня с картинками
return { processed: true, size: imageData.length };
}
}
Суть в чём: главный поток не ебётся с вычислениями, а кидает задачи этим воркерам. Они там сами разбираются, а когда готово — отдают результат. Красота!
2. Разбиваем задачу на куски, чтобы Event Loop не сдох
А бывает так, что Worker Threads — это как из пушки по воробьям, оверкилл. Тогда можно просто большую задачу нарезать на мелкие куски и между ними дышать давать. Типа так:
function processLargeDatasetInChunks(dataset, chunkSize = 1000, processItem) {
let index = 0;
let results = [];
return new Promise((resolve) => {
function processChunk() {
const startTime = Date.now();
// Пашем, но не больше 10 миллисекунд — потом отдыхать
while (index < dataset.length && (Date.now() - startTime) < 10) {
results.push(processItem(dataset[index]));
index++;
}
if (index < dataset.length) {
// Кричим Event Loop: "Эй, братан, отдышись чутка, потом продолжим!"
setImmediate(processChunk);
} else {
resolve(results);
}
}
processChunk();
});
}
// Использование
const largeArray = Array.from({ length: 1000000 }, (_, i) => i);
processLargeDatasetInChunks(largeArray, 5000, (item) => {
// Какие-то математические пляски с бубном
return Math.sqrt(item) * Math.sin(item);
}).then(results => {
console.log(`Processed ${results.length} items`);
});
Это как есть огромный бутерброд — ты его не целиком в рот пихаешь, а откусываешь по кусочку, жуёшь, глотаешь, потом следующий. И желудок не взрывается.
3. Child Processes — когда нужна полная изоляция, ёпта
А вот если твоя задача — такая манда с ушами, которая может всё похерить, то лучше её вообще в отдельный процесс загнать. Пусть там себе пашет, а если сдохнет — то только сам, главное приложение останется живое. Как будто у тебя сожитель, который вечно бухает: пусть себе бухает в своей комнате, а ты в своей.
// compute-service.js
const { fork } = require('child_process');
const path = require('path');
class ComputeService {
constructor() {
this.workers = new Map();
}
async runHeavyComputation(data, computationType) {
const worker = fork(path.join(__dirname, 'computation-worker.js'));
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
worker.kill();
reject(new Error('Computation timeout'));
}, 30000);
worker.on('message', (result) => {
clearTimeout(timeout);
worker.kill();
resolve(result);
});
worker.on('error', reject);
worker.send({ data, computationType });
});
}
}
// computation-worker.js
process.on('message', ({ data, computationType }) => {
let result;
switch (computationType) {
case 'matrixMultiplication':
result = multiplyMatrices(data.matrixA, data.matrixB);
break;
case 'dataAnalysis':
result = analyzeDataset(data.dataset);
break;
}
process.send({ result });
});
function multiplyMatrices(a, b) {
// Тяжеленная хуйня с матрицами
return { /* результат */ };
}
4. Ну и напоследок, несколько житейских советов
- Что выбрать: Worker Threads — когда данные общие и нужно быстро. Child Processes — когда задача опасная, как хитрая жопа, и нужно, чтобы она в случае чего не потопила весь корабль.
- Пул воркеров делай всегда. Создавать их на каждый чих — это пиздец какой неэффективный подход, они же не бесплатные.
- Ошибки лови. Представь, что твой воркер — это такой полупидор, который может в любой момент накрыться медным тазом. Надо быть готовым к этому и иметь план Б.
- Следи за ними. Мониторь, сколько они памяти жрут, как CPU грузят. А то будет тебе сюрприз, когда сервак ляжет от того, что десять воркеров решили посчитать число Пи до миллионного знака.
- Кэшируй, если можно. Если одна и та же тяжёлая хуйня считается часто — сохраняй результат, не заставляй бедные ядра процессора снова и снова ебаться с одним и тем же.
В общем, суть в том, чтобы не пытаться всё делать в одном потоке, а раздавать работу тем, кто с ней справится. Как в старом анекдоте: один ест мясо, другой капусту, а вместе они щи готовят. Вот и тут так же.