Как реализовать многопоточность используя userland в Node.js

Ответ

Хотя Node.js однопоточен, для CPU-интенсивных задач я использовал модуль worker_threads и библиотеки уровня userland. Вот практический пример из проекта по обработке изображений.

1. Создание пула воркеров с библиотекой piscina:

npm install piscina
// worker-script.js - файл, который будет выполняться в потоке
const { parentPort } = require('worker_threads');
const sharp = require('sharp'); // CPU-heavy библиотека для изображений

parentPort.on('message', async (task) => {
  const { imageBuffer, operations } = task;
  try {
    let processor = sharp(imageBuffer);
    // Применяем операции (ресайз, обрезка, фильтры)
    if (operations.resize) {
      processor = processor.resize(operations.resize.width, operations.resize.height);
    }
    const processedBuffer = await processor.toBuffer();
    parentPort.postMessage({ success: true, buffer: processedBuffer });
  } catch (error) {
    parentPort.postMessage({ success: false, error: error.message });
  }
});
// main.js - основной процесс
const Piscina = require('piscina');
const path = require('path');
const fs = require('fs').promises;

// Создаём пул из 4 воркеров (по числу ядер CPU)
const pool = new Piscina({
  filename: path.resolve(__dirname, 'worker-script.js'),
  minThreads: 2,
  maxThreads: 4
});

async processBatchOfImages(imagePaths) {
  const tasks = imagePaths.map(async (imagePath) => {
    const imageBuffer = await fs.readFile(imagePath);
    // Отправляем задачу в пул воркеров
    return pool.run({
      imageBuffer,
      operations: { resize: { width: 800, height: 600 } }
    });
  });

  // Обрабатываем результаты по мере готовности
  const results = await Promise.allSettled(tasks);
  results.forEach((result, index) => {
    if (result.status === 'fulfilled' && result.value.success) {
      console.log(`Изображение ${imagePaths[index]} обработано`);
      // Сохраняем result.value.buffer ...
    } else {
      console.error(`Ошибка обработки ${imagePaths[index]}:`, result.reason || result.value?.error);
    }
  });
}

2. Использование SharedArrayBuffer для обмена данными (осторожно!): Для задач, где нужно минимизировать копирование данных между потоками (например, обработка большого бинарного файла), использовал SharedArrayBuffer.

// В основном потоке
const sharedBuffer = new SharedArrayBuffer(1024 * 1024); // 1MB
const arr = new Uint8Array(sharedBuffer);
// Заполняем массив данными...

// Передаём sharedBuffer в воркер
pool.run({ buffer: sharedBuffer }, { transferList: [sharedBuffer] });

3. Мониторинг и управление пулом: Библиотеки уровня userland, такие как piscina, предоставляют статистику.

console.log(`Активных задач: ${pool.queueSize}`);
console.log(`Занято воркеров: ${pool.completed}`);

Ключевые выводы:

  • Worker Threads идеальны для длительных синхронных вычислений (математика, обработка изображений/видео, шифрование).
  • Не используйте их для I/O операций — для этого асинхронный Event Loop Node.js эффективнее.
  • Пул воркеров (piscina) предотвращает накладные расходы на создание потока для каждой задачи.
  • Обмен сообщениями между потоками имеет свою стоимость, поэтому лучше отправлять одну большую задачу, чем много мелких.

В своём проекте это позволило ускорить обработку пачки из 100 изображений с 30 секунд (последовательно) до примерно 8 секунд (4 потока).

Ответ 18+ 🔞

Да ты посмотри, какая история! Сидишь себе на Node.js, всё вроде красиво, асинхронно, а тут бац — надо сотню фоток прогнать через sharp. И понеслась, ёпта. Ждёшь-ждёшь, а Event Loop твой, такой весь из себя неблокирующий, просто встаёт колом, потому что процессор упёрся в потолок. Картинка-то обрабатывается в синхронном коде, вот и вся магия. Удивление пиздец, когда понимаешь, что твой быстрый сервер превратился в улитку.

Ну, я не дурак, пошёл гуглить. Оказалось, спасение есть — worker_threads. Но руками с ними возиться — тот ещё геморрой, честно. Создать поток, обменяться сообщениями, не накосячить с памятью... Ядрёна вошь! Поэтому взял готовую библиотечку piscina, она пул воркеров организует, как миленькая.

Смотри, как я сделал. Сначала создал отдельный скрипт для воркера, где вся тяжкая работа и происходит.

// worker-script.js
const { parentPort } = require('worker_threads');
const sharp = require('sharp');

parentPort.on('message', async (task) => {
  const { imageBuffer, operations } = task;
  try {
    let processor = sharp(imageBuffer);
    if (operations.resize) {
      processor = processor.resize(operations.resize.width, operations.resize.height);
    }
    const processedBuffer = await processor.toBuffer();
    parentPort.postMessage({ success: true, buffer: processedBuffer });
  } catch (error) {
    parentPort.postMessage({ success: false, error: error.message });
  }
});

А в основном процессе поднимаю пул на 4 потока (по ядрам процессора) и швыряю туда задачи.

// main.js
const Piscina = require('piscina');
const path = require('path');
const fs = require('fs').promises;

const pool = new Piscina({
  filename: path.resolve(__dirname, 'worker-script.js'),
  minThreads: 2,
  maxThreads: 4
});

async processBatchOfImages(imagePaths) {
  const tasks = imagePaths.map(async (imagePath) => {
    const imageBuffer = await fs.readFile(imagePath);
    return pool.run({
      imageBuffer,
      operations: { resize: { width: 800, height: 600 } }
    });
  });

  const results = await Promise.allSettled(tasks);
  results.forEach((result, index) => {
    if (result.status === 'fulfilled' && result.value.success) {
      console.log(`Изображение ${imagePaths[index]} обработано`);
    } else {
      console.error(`Ошибка обработки ${imagePaths[index]}:`, result.reason || result.value?.error);
    }
  });
}

Была ещё идея с SharedArrayBuffer поиграться, чтобы данные между потоками не копировать, а сразу в общей памяти работать. Выглядит страшновато и мощно, но там, бля, доверия ебать ноль — можно так накосячить, что мало не покажется. Осторожно надо.

const sharedBuffer = new SharedArrayBuffer(1024 * 1024);
const arr = new Uint8Array(sharedBuffer);
pool.run({ buffer: sharedBuffer }, { transferList: [sharedBuffer] });

И знаешь, что в итоге? Раньше сто картинок обрабатывались 30 секунд, а теперь — около восьми. Скорость выросла в разы, просто праздник какой-то! Главный вывод, чувак: воркеры — это для настоящей, тяжёлой математики, а не для чтения файлов. Для I/O твой асинхронный движок Node.js и так огонь. А если уж задача реально CPU-bound, то без пула потоков — никуда. Терпения ноль ебать ждать, когда один поток всё пережуёт.