Что такое потоки (Streams) в Node.js и зачем они нужны?

Ответ

Потоки (Streams) в Node.js — это абстракции для работы с последовательными данными, которые позволяют обрабатывать информацию по частям (чанкам), не загружая весь объём в память. Это фундаментальная концепция для эффективной работы с I/O операциями.

Типы потоков:

  • Readable: Поток для чтения (например, fs.createReadStream, HTTP-запрос).
  • Writable: Поток для записи (например, fs.createWriteStream, HTTP-ответ).
  • Duplex: Читаемый и записываемый одновременно (например, TCP-сокет).
  • Transform: Разновидность Duplex, которая может модифицировать данные (например, zlib.createGzip).

Практические примеры:

  1. Копирование больших файлов с контролем памяти:

    const fs = require('fs');
    const path = require('path');
    
    const source = path.join(__dirname, 'large-video.mp4');
    const destination = path.join(__dirname, 'copy-video.mp4');
    
    const readStream = fs.createReadStream(source);
    const writeStream = fs.createWriteStream(destination);
    
    // Pipe автоматически управляет backpressure
    readStream.pipe(writeStream);
    
    writeStream.on('finish', () => {
      console.log('Файл успешно скопирован потоком.');
    });
    
    writeStream.on('error', (err) => {
      console.error('Ошибка записи:', err);
    });

    Без потоков пришлось бы использовать fs.readFile, который загрузил бы весь файл в память.

  2. Обработка данных на лету (Transform Stream):

    const { Transform } = require('stream');
    const fs = require('fs');
    
    // Создаём кастомный трансформационный поток для шифрования
    class EncryptStream extends Transform {
      constructor(shift) {
        super();
        this.shift = shift;
      }
    
      _transform(chunk, encoding, callback) {
        // Простой шифр Цезаря для примера
        const encrypted = chunk.toString().split('').map(char => {
          const code = char.charCodeAt(0);
          return String.fromCharCode(code + this.shift);
        }).join('');
        this.push(encrypted);
        callback();
      }
    }
    
    const encryptor = new EncryptStream(3);
    const reader = fs.createReadStream('secret.txt');
    const writer = fs.createWriteStream('secret.encrypted.txt');
    
    reader.pipe(encryptor).pipe(writer);
  3. Обработка HTTP-запросов/ответов:

    const http = require('http');
    const fs = require('fs');
    
    const server = http.createServer((req, res) => {
      // Отправка большого файла клиенту потоком
      const fileStream = fs.createReadStream('./large-data.json');
      res.writeHead(200, { 'Content-Type': 'application/json' });
      fileStream.pipe(res); // Данные отправляются чанками по мере чтения
    
      fileStream.on('error', (err) => {
        res.statusCode = 500;
        res.end('Internal Server Error');
      });
    });

Ключевые преимущества потоков в Node.js:

  • Эффективное использование памяти: Обработка данных чанками (по умолчанию 64KB) вместо загрузки гигабайтных файлов целиком.
  • Улучшенная производительность и отзывчивость: Данные можно начинать обрабатывать или отправлять сразу после получения первого чанка, не дожидаясь окончания передачи.
  • Контроль backpressure: Механизм pipe() автоматически приостанавливает чтение, если запись не успевает, предотвращая переполнение памяти.
  • Композиция: Потоки легко соединяются в конвейеры (pipeline) для сложной обработки (чтение -> распаковка -> парсинг -> запись).

Ответ 18+ 🔞

Да ты посмотри, какая хитрая жопа эти потоки в Node.js! Это ж просто магия какая-то, а не абстракция. Представь: тебе не нужно загружать в память овердохуища данных целиком, типа этот твой гигабайтный лог-файл или видео с котиками. Всё обрабатывается по кусочкам, по чанкам, прямо на лету. Ёпта, удобно же!

Какие они бывают, эти потоки:

  • Readable: Тот, из которого можно читать. Как кран с водой открыл — полилось. Файл читаешь или HTTP-запрос слушаешь.
  • Writable: Тот, в который можно писать. Куда данные сливаешь. В файл, в ответ сервера.
  • Duplex: Универсальный солдат, и читает, и пишет одновременно. Как телефонная трубка, и слушаешь, и орешь.
  • Transform: Ну это вообще красавец, подвид Duplex. Он данные не просто гоняет, а по дороге меняет их, как хочет. Сжал, зашифровал, строку перевернул — хуй с горы!

Смотри, как это в жизни выглядит:

  1. Копируем файл размером с твою совесть, но память цела:

    const fs = require('fs');
    const path = require('path');
    
    const source = path.join(__dirname, 'large-video.mp4');
    const destination = path.join(__dirname, 'copy-video.mp4');
    
    const readStream = fs.createReadStream(source); // Открыл кран
    const writeStream = fs.createWriteStream(destination); // Подставил ведро
    
    // Магия pipe! Соединил кран с ведром, и пошло-поехало.
    readStream.pipe(writeStream);
    
    writeStream.on('finish', () => {
      console.log('Файл скопирован, память не вздулась, все живы.');
    });
    
    writeStream.on('error', (err) => {
      console.error('Ошибка записи, ёпта:', err); // Ведро-то дырявое оказалось
    });

    А если бы через fs.readFile, то этот файл целиком в оперативку влез бы и сервак накрылся медным тазом. Доверия к такому подходу — ноль ебать.

  2. Делаем свой фильтр для данных, прямо как в Instagram, но для текста:

    const { Transform } = require('stream');
    const fs = require('fs');
    
    // Вот наш самопальный шифратор, типа Цезарь, но наш, родной.
    class EncryptStream extends Transform {
      constructor(shift) {
        super();
        this.shift = shift; // На сколько букв сдвигаем
      }
    
      _transform(chunk, encoding, callback) {
        // Берём кусок данных и колдуем над ним
        const encrypted = chunk.toString().split('').map(char => {
          const code = char.charCodeAt(0);
          return String.fromCharCode(code + this.shift); // Сдвигаем букву
        }).join('');
        this.push(encrypted); // Выплёвываем зашифрованное
        callback(); // Говорим, что готовы к следующему куску
      }
    }
    
    const encryptor = new EncryptStream(3); // Шифратор готов
    const reader = fs.createReadStream('secret.txt'); // Читаем тайное послание
    const writer = fs.createWriteStream('secret.encrypted.txt'); // Пишем тайное тайное послание
    
    // Собираем конвейер: прочитал -> зашифровал -> записал. Красота!
    reader.pipe(encryptor).pipe(writer);
  3. Отдаём клиенту файл, который больше, чем его терпение:

    const http = require('http');
    const fs = require('fs');
    
    const server = http.createServer((req, res) => {
      // Клиент просит огромный JSON? Без проблем!
      const fileStream = fs.createReadStream('./large-data.json');
      res.writeHead(200, { 'Content-Type': 'application/json' });
      fileStream.pipe(res); // Начинаем струить данные в ответ сразу, не ждём конца файла!
    
      fileStream.on('error', (err) => {
        res.statusCode = 500;
        res.end('Всё пропало, шеф!'); // Если файл не найден или ещё какая беда
      });
    });

Так в чём же, блядь, соль-то? А соль вот в чём:

  • Память не ебёт: Данные идут чанками (по 64КБ обычно), а не одним здоровенным куском. Можно хоть терабайтный дамп гонять.
  • Скорость и отзывчивость: Не ждём, пока всё прочитается. Получил первый кусок — уже начинай работать или отдавать клиенту. Волнение ебать, как быстро!
  • Контроль давления (backpressure): Умный механизм. Если запись (ведро) не успевает за чтением (краном), кран прикручивается сам, чтобы не затопить всё. Всё автоматически, pipe() за этим следит.
  • Конвейеры (pipeline): Это же ебушки-воробушки! Можно строить цепочки: прочитал файл -> распаковал из gzip -> распарсил CSV -> отфильтровал строки -> записал в базу. Всё течёт, как по маслу.