Claude Code Plugins

Community-maintained marketplace

Feedback

Master Node.js streams for memory-efficient processing of large datasets, real-time data handling, and building data pipelines

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name streams
description Master Node.js streams for memory-efficient processing of large datasets, real-time data handling, and building data pipelines
version 2.1.0
sasmp_version 1.3.0
bonded_agent 03-async-programming
bond_type PRIMARY_BOND

Node.js Streams Skill

Master streams for memory-efficient processing of large files, real-time data, and building composable data pipelines.

Quick Start

Streams in 4 types:

  1. Readable - Source of data (file, HTTP request)
  2. Writable - Destination (file, HTTP response)
  3. Transform - Modify data in transit
  4. Duplex - Both readable and writable

Core Concepts

Readable Stream

const fs = require('fs');

// Create readable stream
const readStream = fs.createReadStream('large-file.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KB chunks
});

// Event-based consumption
readStream.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes`);
});

readStream.on('end', () => {
  console.log('Finished reading');
});

readStream.on('error', (err) => {
  console.error('Read error:', err);
});

Writable Stream

const writeStream = fs.createWriteStream('output.txt');

// Write data
writeStream.write('Hello, ');
writeStream.write('World!\n');
writeStream.end(); // Signal end

// Handle backpressure
const ok = writeStream.write(data);
if (!ok) {
  // Wait for drain event before writing more
  writeStream.once('drain', () => {
    continueWriting();
  });
}

Transform Stream

const { Transform } = require('stream');

// Custom transform: uppercase text
const upperCase = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// Usage
fs.createReadStream('input.txt')
  .pipe(upperCase)
  .pipe(fs.createWriteStream('output.txt'));

Learning Path

Beginner (1-2 weeks)

  • ✅ Understand stream types
  • ✅ Read/write file streams
  • ✅ Basic pipe operations
  • ✅ Handle stream events

Intermediate (3-4 weeks)

  • ✅ Transform streams
  • ✅ Backpressure handling
  • ✅ Object mode streams
  • ✅ Pipeline utility

Advanced (5-6 weeks)

  • ✅ Custom stream implementation
  • ✅ Async iterators
  • ✅ Web Streams API
  • ✅ Performance optimization

Pipeline (Recommended)

const { pipeline } = require('stream/promises');
const zlib = require('zlib');

// Compose streams with error handling
async function compressFile(input, output) {
  await pipeline(
    fs.createReadStream(input),
    zlib.createGzip(),
    fs.createWriteStream(output)
  );
  console.log('Compression complete');
}

// With transform
await pipeline(
  fs.createReadStream('data.csv'),
  csvParser(),
  transformRow(),
  jsonStringify(),
  fs.createWriteStream('data.json')
);

Pipeline with Error Handling

const { pipeline } = require('stream');

pipeline(
  source,
  transform1,
  transform2,
  destination,
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

HTTP Streaming

const http = require('http');
const fs = require('fs');

// Stream file as HTTP response
http.createServer((req, res) => {
  const filePath = './video.mp4';
  const stat = fs.statSync(filePath);

  res.writeHead(200, {
    'Content-Type': 'video/mp4',
    'Content-Length': stat.size
  });

  // Stream instead of loading entire file
  fs.createReadStream(filePath).pipe(res);
}).listen(3000);

// Stream HTTP request body
http.createServer((req, res) => {
  const writeStream = fs.createWriteStream('./upload.bin');
  req.pipe(writeStream);

  req.on('end', () => {
    res.end('Upload complete');
  });
}).listen(3001);

Object Mode Streams

const { Transform } = require('stream');

const jsonParser = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    try {
      const obj = JSON.parse(chunk);
      this.push(obj);
      callback();
    } catch (err) {
      callback(err);
    }
  }
});

// Process objects instead of buffers
const processRecords = new Transform({
  objectMode: true,
  transform(record, encoding, callback) {
    record.processed = true;
    record.timestamp = Date.now();
    this.push(record);
    callback();
  }
});

Async Iterators

const { Readable } = require('stream');

// Create from async iterator
async function* generateData() {
  for (let i = 0; i < 100; i++) {
    yield { id: i, data: `item-${i}` };
  }
}

const stream = Readable.from(generateData(), { objectMode: true });

// Consume with for-await
async function processStream(readable) {
  for await (const chunk of readable) {
    console.log('Processing:', chunk);
  }
}

Backpressure Handling

const readable = fs.createReadStream('huge-file.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', (chunk) => {
  // Check if writable can accept more data
  const canContinue = writable.write(chunk);

  if (!canContinue) {
    // Pause reading until writable is ready
    readable.pause();
    writable.once('drain', () => {
      readable.resume();
    });
  }
});

// Or use pipeline (handles automatically)
pipeline(readable, writable, (err) => {
  if (err) console.error('Error:', err);
});

Custom Readable Stream

const { Readable } = require('stream');

class DatabaseStream extends Readable {
  constructor(query, options) {
    super({ ...options, objectMode: true });
    this.query = query;
    this.cursor = null;
  }

  async _read() {
    if (!this.cursor) {
      this.cursor = await db.collection('items').find(this.query).cursor();
    }

    const doc = await this.cursor.next();
    if (doc) {
      this.push(doc);
    } else {
      this.push(null); // Signal end
    }
  }
}

// Usage
const dbStream = new DatabaseStream({ status: 'active' });
for await (const item of dbStream) {
  console.log(item);
}

Unit Test Template

const { Readable, Transform } = require('stream');
const { pipeline } = require('stream/promises');

describe('Stream Processing', () => {
  it('should transform data correctly', async () => {
    const input = Readable.from(['hello', 'world']);
    const chunks = [];

    const upperCase = new Transform({
      transform(chunk, enc, cb) {
        this.push(chunk.toString().toUpperCase());
        cb();
      }
    });

    await pipeline(
      input,
      upperCase,
      async function* (source) {
        for await (const chunk of source) {
          chunks.push(chunk.toString());
        }
      }
    );

    expect(chunks).toEqual(['HELLO', 'WORLD']);
  });
});

Troubleshooting

Problem Cause Solution
Memory grows infinitely No backpressure Use pipeline or handle drain
Data loss Errors not caught Use pipeline with error callback
Slow processing Small chunk size Increase highWaterMark
Stream hangs Missing end() call Call writable.end()

When to Use

Use streams when:

  • Processing large files (GB+)
  • Real-time data processing
  • Memory-constrained environments
  • Building data pipelines
  • HTTP request/response handling

Related Skills

  • Async Programming (async patterns)
  • Performance Optimization (memory efficiency)
  • Express REST API (streaming responses)

Resources