Node.js 开发指南 / 第 8 章 · 流(Streams)
第 8 章 · 流(Streams)
8.1 为什么需要流
处理大文件时,一次性读入内存会导致内存溢出。流允许我们逐块处理数据,内存占用恒定。
// ❌ 一次性读取 — 大文件会导致内存溢出
const data = fs.readFileSync('huge-file.log'); // 可能占用数 GB 内存
// ✅ 使用流 — 内存占用恒定
const stream = fs.createReadStream('huge-file.log');
stream.on('data', (chunk) => {
// 每次处理一块(默认 64KB)
process.stdout.write(chunk);
});
流的类型
| 类型 | 说明 | 示例 |
|---|---|---|
| Readable | 可读流,数据源 | fs.createReadStream, http.IncomingMessage |
| Writable | 可写流,数据目的地 | fs.createWriteStream, http.ServerResponse |
| Duplex | 双工流,同时可读可写 | net.Socket, zlib.createGzip() |
| Transform | 转换流,边读边改 | crypto.createCipher(), zlib.createGzip() |
Readable → [数据源] ─────────→
Writable ← ───────── [数据目的地]
Duplex ←→ [既可读又可写] ←→
Transform → [读取] → [转换] → [输出]
8.2 可读流(Readable)
创建可读流
const fs = require('fs');
// 从文件创建
const readable = fs.createReadStream('data.txt', {
encoding: 'utf8',
highWaterMark: 1024, // 每次读取的字节数
});
// 事件驱动模式
readable.on('data', (chunk) => {
console.log(`收到 ${chunk.length} 字节`);
});
readable.on('end', () => {
console.log('读取完成');
});
readable.on('error', (err) => {
console.error('读取错误:', err);
});
流模式
// 流的两种模式
// 1. 流动模式(Flowing)— 数据自动读取
readable.on('data', (chunk) => { /* ... */ });
// 2. 暂停模式(Paused)— 手动调用 read()
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read()) !== null) {
console.log(`收到 ${chunk.length} 字节`);
}
});
// 切换模式
readable.pause(); // 暂停
readable.resume(); // 恢复
自定义可读流
const { Readable } = require('stream');
class Counter extends Readable {
constructor(options) {
super(options);
this.current = 0;
this.max = options.max || 10;
}
_read(size) {
if (this.current <= this.max) {
this.push(String(this.current++) + '\n');
} else {
this.push(null); // 表示流结束
}
}
}
const counter = new Counter({ max: 5, encoding: 'utf8' });
counter.pipe(process.stdout);
// 输出: 0\n1\n2\n3\n4\n5\n
异步迭代器方式读取
const fs = require('fs');
async function readFile() {
const stream = fs.createReadStream('data.txt', { encoding: 'utf8' });
for await (const chunk of stream) {
console.log(chunk);
}
console.log('读取完成');
}
8.3 可写流(Writable)
创建可写流
const fs = require('fs');
const writable = fs.createWriteStream('output.txt');
writable.write('第一行\n');
writable.write('第二行\n');
writable.end('最后一行\n');
writable.on('finish', () => {
console.log('写入完成');
});
writable.on('error', (err) => {
console.error('写入错误:', err);
});
背压(Backpressure)处理
const fs = require('fs');
function writeLargeFile() {
const readable = fs.createReadStream('input.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', (chunk) => {
// write() 返回 false 时表示内部缓冲区已满
const canContinue = writable.write(chunk);
if (!canContinue) {
// 暂停读取,等待排空
readable.pause();
}
});
// 缓冲区排空后恢复读取
writable.on('drain', () => {
readable.resume();
});
readable.on('end', () => {
writable.end();
});
}
// ✅ 更简单的方式:使用 pipe
fs.createReadStream('input.txt')
.pipe(fs.createWriteStream('output.txt'));
自定义可写流
const { Writable } = require('stream');
class Logger extends Writable {
_write(chunk, encoding, callback) {
const message = chunk.toString().trim();
console.log(`[${new Date().toISOString()}] ${message}`);
callback(); // 必须调用,通知写入完成
}
}
const logger = new Logger();
logger.write('用户登录\n');
logger.write('请求处理中\n');
logger.end('服务停止\n');
8.4 双工流(Duplex)
const { Duplex } = require('stream');
class Echo extends Duplex {
constructor(options) {
super(options);
this.buffer = [];
}
_write(chunk, encoding, callback) {
this.buffer.push(chunk);
callback();
}
_read(size) {
if (this.buffer.length > 0) {
this.push(this.buffer.shift());
} else {
this.push(null);
}
}
}
8.5 转换流(Transform)
const { Transform } = require('stream');
// 大写转换流
class UpperCase extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
// JSON 解析流
class JSONParser extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
}
_transform(chunk, encoding, callback) {
try {
const obj = JSON.parse(chunk.toString());
this.push(obj);
callback();
} catch (err) {
callback(err);
}
}
}
// 使用示例
fs.createReadStream('data.txt')
.pipe(new UpperCase())
.pipe(process.stdout);
8.6 管道(Pipeline)
pipe 方法
const { pipeline } = require('stream');
const zlib = require('zlib');
const fs = require('fs');
// 链式 pipe
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'))
.on('finish', () => console.log('压缩完成'));
pipeline 函数(推荐)
const { pipeline } = require('stream/promises');
// ✅ 使用 pipeline 自动处理错误和清理
async function compressFile(input, output) {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
);
console.log('压缩完成');
}
// 使用示例
await compressFile('input.txt', 'input.txt.gz');
// 解压缩
async function decompressFile(input, output) {
await pipeline(
fs.createReadStream(input),
zlib.createGunzip(),
fs.createWriteStream(output)
);
console.log('解压完成');
}
流式 HTTP 响应
const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream/promises');
const server = http.createServer(async (req, res) => {
if (req.url === '/download') {
const filePath = './large-file.zip';
res.writeHead(200, {
'Content-Type': 'application/octet-stream',
'Content-Disposition': 'attachment; filename="file.zip"',
});
await pipeline(fs.createReadStream(filePath), res);
// 如果使用 pipe,需要手动处理错误
}
});
server.listen(3000);
8.7 objectMode 流
const { Transform, pipeline } = require('stream');
const fs = require('fs');
// 对象模式流 — 处理 JavaScript 对象而非 Buffer
class FilterUsers extends Transform {
constructor(options = {}) {
super({ ...options, objectMode: true });
}
_transform(user, encoding, callback) {
if (user.age >= 18) {
this.push(user);
}
callback();
}
}
class FormatUser extends Transform {
constructor(options = {}) {
super({ ...options, objectMode: true });
}
_transform(user, encoding, callback) {
this.push(`${user.name} (${user.age}岁)\n`);
callback();
}
}
// 使用
const users = [
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 15 },
{ name: 'Charlie', age: 25 },
];
const { Readable } = require('stream');
Readable.from(users)
.pipe(new FilterUsers())
.pipe(new FormatUser())
.pipe(process.stdout);
// 输出:
// Alice (30岁)
// Charlie (25岁)
注意事项
⚠️ 始终处理错误事件:流如果没有
error事件监听器,错误会导致进程崩溃。
⚠️ 使用 pipeline 代替 pipe:
pipe不会自动处理错误和清理资源,推荐使用stream.pipeline。
⚠️ 注意背压:快速的可读流配合慢速的可写流会导致内存溢出,
pipe会自动处理背压。
⚠️ 不要在 Transform 中同步调用 callback:应该异步调用以避免栈溢出。
业务场景
- 大文件处理:使用流逐行处理 GB 级日志文件
- 文件压缩/解压:使用 zlib 转换流实现流式压缩
- 数据管道:CSV → 解析 → 转换 → 数据库的流式 ETL
- HTTP 大文件下载:使用流式响应避免内存溢出
扩展阅读
上一章:第 7 章 · 事件循环 下一章:第 9 章 · Buffer 与二进制数据 — Buffer 操作、编码和二进制数据处理。