| name | streams |
| description | Master Node.js streams for memory-efficient processing of large datasets, real-time data handling, and building data pipelines |
| version | 2.1.0 |
| sasmp_version | 1.3.0 |
| bonded_agent | 03-async-programming |
| bond_type | PRIMARY_BOND |
Node.js Streams Skill
Master streams for memory-efficient processing of large files, real-time data, and building composable data pipelines.
Quick Start
Streams in 4 types:
- Readable - Source of data (file, HTTP request)
- Writable - Destination (file, HTTP response)
- Transform - Modify data in transit
- Duplex - Both readable and writable
Core Concepts
Readable Stream
const fs = require('fs');
// Create readable stream
const readStream = fs.createReadStream('large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks
});
// Event-based consumption
readStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
});
readStream.on('end', () => {
console.log('Finished reading');
});
readStream.on('error', (err) => {
console.error('Read error:', err);
});
Writable Stream
const writeStream = fs.createWriteStream('output.txt');
// Write data
writeStream.write('Hello, ');
writeStream.write('World!\n');
writeStream.end(); // Signal end
// Handle backpressure
const ok = writeStream.write(data);
if (!ok) {
// Wait for drain event before writing more
writeStream.once('drain', () => {
continueWriting();
});
}
Transform Stream
const { Transform } = require('stream');
// Custom transform: uppercase text
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// Usage
fs.createReadStream('input.txt')
.pipe(upperCase)
.pipe(fs.createWriteStream('output.txt'));
Learning Path
Beginner (1-2 weeks)
- ✅ Understand stream types
- ✅ Read/write file streams
- ✅ Basic pipe operations
- ✅ Handle stream events
Intermediate (3-4 weeks)
- ✅ Transform streams
- ✅ Backpressure handling
- ✅ Object mode streams
- ✅ Pipeline utility
Advanced (5-6 weeks)
- ✅ Custom stream implementation
- ✅ Async iterators
- ✅ Web Streams API
- ✅ Performance optimization
Pipeline (Recommended)
const { pipeline } = require('stream/promises');
const zlib = require('zlib');
// Compose streams with error handling
async function compressFile(input, output) {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
);
console.log('Compression complete');
}
// With transform
await pipeline(
fs.createReadStream('data.csv'),
csvParser(),
transformRow(),
jsonStringify(),
fs.createWriteStream('data.json')
);
Pipeline with Error Handling
const { pipeline } = require('stream');
pipeline(
source,
transform1,
transform2,
destination,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
HTTP Streaming
const http = require('http');
const fs = require('fs');
// Stream file as HTTP response
http.createServer((req, res) => {
const filePath = './video.mp4';
const stat = fs.statSync(filePath);
res.writeHead(200, {
'Content-Type': 'video/mp4',
'Content-Length': stat.size
});
// Stream instead of loading entire file
fs.createReadStream(filePath).pipe(res);
}).listen(3000);
// Stream HTTP request body
http.createServer((req, res) => {
const writeStream = fs.createWriteStream('./upload.bin');
req.pipe(writeStream);
req.on('end', () => {
res.end('Upload complete');
});
}).listen(3001);
Object Mode Streams
const { Transform } = require('stream');
const jsonParser = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
const obj = JSON.parse(chunk);
this.push(obj);
callback();
} catch (err) {
callback(err);
}
}
});
// Process objects instead of buffers
const processRecords = new Transform({
objectMode: true,
transform(record, encoding, callback) {
record.processed = true;
record.timestamp = Date.now();
this.push(record);
callback();
}
});
Async Iterators
const { Readable } = require('stream');
// Create from async iterator
async function* generateData() {
for (let i = 0; i < 100; i++) {
yield { id: i, data: `item-${i}` };
}
}
const stream = Readable.from(generateData(), { objectMode: true });
// Consume with for-await
async function processStream(readable) {
for await (const chunk of readable) {
console.log('Processing:', chunk);
}
}
Backpressure Handling
const readable = fs.createReadStream('huge-file.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', (chunk) => {
// Check if writable can accept more data
const canContinue = writable.write(chunk);
if (!canContinue) {
// Pause reading until writable is ready
readable.pause();
writable.once('drain', () => {
readable.resume();
});
}
});
// Or use pipeline (handles automatically)
pipeline(readable, writable, (err) => {
if (err) console.error('Error:', err);
});
Custom Readable Stream
const { Readable } = require('stream');
class DatabaseStream extends Readable {
constructor(query, options) {
super({ ...options, objectMode: true });
this.query = query;
this.cursor = null;
}
async _read() {
if (!this.cursor) {
this.cursor = await db.collection('items').find(this.query).cursor();
}
const doc = await this.cursor.next();
if (doc) {
this.push(doc);
} else {
this.push(null); // Signal end
}
}
}
// Usage
const dbStream = new DatabaseStream({ status: 'active' });
for await (const item of dbStream) {
console.log(item);
}
Unit Test Template
const { Readable, Transform } = require('stream');
const { pipeline } = require('stream/promises');
describe('Stream Processing', () => {
it('should transform data correctly', async () => {
const input = Readable.from(['hello', 'world']);
const chunks = [];
const upperCase = new Transform({
transform(chunk, enc, cb) {
this.push(chunk.toString().toUpperCase());
cb();
}
});
await pipeline(
input,
upperCase,
async function* (source) {
for await (const chunk of source) {
chunks.push(chunk.toString());
}
}
);
expect(chunks).toEqual(['HELLO', 'WORLD']);
});
});
Troubleshooting
| Problem | Cause | Solution |
|---|---|---|
| Memory grows infinitely | No backpressure | Use pipeline or handle drain |
| Data loss | Errors not caught | Use pipeline with error callback |
| Slow processing | Small chunk size | Increase highWaterMark |
| Stream hangs | Missing end() call | Call writable.end() |
When to Use
Use streams when:
- Processing large files (GB+)
- Real-time data processing
- Memory-constrained environments
- Building data pipelines
- HTTP request/response handling
Related Skills
- Async Programming (async patterns)
- Performance Optimization (memory efficiency)
- Express REST API (streaming responses)