Modern Async Pipelines & Error Handling
You know how individual streams work now - Readable producing data, Writable consuming it, Transform processing it in between. Each stream type has its own buffering, its own backpressure mechanism, its own event lifecycle. In real applications, you rarely work with streams in isolation. You're connecting them together, creating pipelines where data flows from source through multiple transformation stages to a final destination.
And this is where things get tricky. The concept's fine - piping data from one stream to another is straightforward enough - but actually doing it correctly? That's where the headaches start. Because when you connect streams, you're dealing with multiple error sources, multiple backpressure signals, and multiple resource cleanup scenarios. If any stream in your pipeline fails, what happens to the others? If backpressure occurs midway through, does it propagate correctly? When the pipeline completes, are all resources cleaned up properly?
This chapter is about answering those questions. We're going to examine the original pipe() method and understand both why it exists and why it's insufficient for production code. Then we'll dive deep into stream.pipeline(), which is the modern, recommended approach for composing stream pipelines with proper error handling and cleanup. After that, we'll explore error handling patterns specific to streaming - because errors in pipelines behave differently from errors in synchronous code. We'll look at using async iteration as an alternative pipeline approach, and finally we'll cover advanced composition patterns for building reusable, flexible pipeline segments.
This chapter covers how to connect streams correctly - with proper error propagation, resource cleanup, and backpressure handling across all stages.
The pipe() Method
You've already learned how pipe() works from the Readable and Writable chapters. As a quick recap: pipe() connects a Readable stream to a Writable, automatically handling backpressure by calling pause() when write() returns false and resume() when the Writable emits drain. We covered this pattern extensively in the Writable Streams chapter when discussing backpressure.
The method returns the destination stream, allowing you to chain pipes:
readable.pipe(transform1).pipe(transform2).pipe(writable);This creates a four-stage pipeline: readable -> transform1 -> transform2 -> writable. Data flows through each stage sequentially, with backpressure propagating backward from the writable destination all the way to the readable source—a pattern we explored in detail in the Writable Streams chapter. If writable signals backpressure, the entire chain pauses; when writable emits drain, the resume signal propagates forward.
A concrete example—compressing a log file:
import { createReadStream, createWriteStream } from "fs";
import { createGzip } from "zlib";
createReadStream("app.log")
.pipe(createGzip())
.pipe(createWriteStream("app.log.gz"));Three streams, two pipe() calls. The file reader produces chunks, the gzip transform compresses them, and the file writer saves the compressed data. Memory usage stays bounded because each stage respects backpressure signals.
But pipe() has a problem: error handling.
Here's what happens when an error occurs in a piped stream. The stream that encounters the error emits an error event. But that error does not propagate to other streams in the pipeline. You already know from the Readable and Writable chapters that each stream needs its own error handler, or the error will crash your process. But in pipelines, this becomes especially painful.
const reader = createReadStream("input.txt");
const transform = createGzip();
const writer = createWriteStream("output.gz");
reader.pipe(transform).pipe(writer);
reader.on("error", (err) => {
console.error("Reader error:", err);
});
transform.on("error", (err) => {
console.error("Transform error:", err);
});
writer.on("error", (err) => {
console.error("Writer error:", err);
});You need three separate error handlers. Miss one and your process crashes. It's tedious, error-prone, and honestly kind of ridiculous for something that should be straightforward.
But it gets worse. When an error occurs in the middle of a pipeline, the other streams don't automatically stop. Suppose the transform throws an error while processing a chunk. The transform emits error and stops processing. But the reader keeps reading and trying to write to the transform, which is now in an errored state. The writer is waiting for data that will never come, and it might never emit finish because the pipeline never completes cleanly.
You end up with dangling resources. File handles that aren't closed. Network connections that aren't cleaned up. Memory buffers that aren't released. The pipeline is in a partially-failed state, and cleaning it up requires manually calling destroy() on each stream:
reader.on("error", (err) => {
reader.destroy();
transform.destroy();
writer.destroy();
console.error("Pipeline failed:", err);
});
transform.on("error", (err) => {
reader.destroy();
transform.destroy();
writer.destroy();
console.error("Pipeline failed:", err);
});
writer.on("error", (err) => {
reader.destroy();
transform.destroy();
writer.destroy();
console.error("Pipeline failed:", err);
});This is verbose, repetitive, and fragile. If you add a new stream to the pipeline, you have to update all the error handlers to destroy the new stream too.
There's another limitation of pipe(): you can't easily tell when the entire pipeline has completed. The readable emits end when it's done reading. The writable emits finish when it's done writing. But which one do you listen to? And what if you have multiple transforms in the middle? Each transform emits its own end event. The final destination emits finish. You have to track the right event on the right stream, which depends on the pipeline's structure.
For simple two-stream scenarios, pipe() works fine. But for real production pipelines with multiple stages and proper error handling requirements, pipe() is insufficient. This is why stream.pipeline() was introduced.
The unpipe() Method
Worth covering unpipe() before pipeline() - though you'll rarely use it. This method disconnects a piped stream:
const writer = writable();
readable.pipe(writer);
// Later, disconnect
readable.unpipe(writer);When you call unpipe(), the readable stops sending data to the specified writable. If you call unpipe() without arguments, it disconnects from all destinations:
readable.unpipe();Why would you use this? Mainly for dynamic routing scenarios where you want to redirect a stream's output based on runtime conditions. For example, you might be reading from a socket and initially piping to a file, but then decide to pipe to a different destination based on incoming data:
socket.on("data", (chunk) => {
if (shouldRedirect(chunk)) {
socket.unpipe(fileWriter);
socket.pipe(differentWriter);
}
});But in practice? I've almost never needed unpipe(). Most pipelines are static - you define the flow at setup time and let it run to completion. Dynamic routing is better handled with higher-level abstractions, like routing streams or conditional transforms.
The main thing to know about unpipe() is that it exists, and that when you unpipe a stream, the destination does not automatically end. When unpipe() is called, it removes the destination's listeners from the source stream. The source stream's flowing mode state depends on whether any consumers remain attached - if all pipe destinations are removed and there are no data event listeners, the stream switches back to paused mode. If other consumers exist (other piped destinations or data listeners), the source continues emitting data events. If you want the destination to close, you need to call end() on it manually.
stream.pipeline()
stream.pipeline() is the modern approach to composing streams. This function was added to Node.js specifically to address the error handling and cleanup problems with pipe(). Here's the basic usage:
import { pipeline } from "stream";
pipeline(readable, transform, writable, (err) => {
if (err) {
console.error("Pipeline failed:", err);
} else {
console.log("Pipeline succeeded");
}
});Instead of chaining pipe() calls, you pass all your streams as arguments to pipeline(), followed by a callback that's invoked when the pipeline completes or errors. That's the signature:
pipeline(stream1, stream2, ..., streamN, callback)pipeline() does three things that pipe() doesn't:
-
Automatic error propagation: If any stream in the pipeline emits an error,
pipeline()stops the pipeline and invokes the callback with that error. You don't need separate error handlers on each stream. -
Automatic cleanup: When an error occurs (or when the pipeline completes),
pipeline()callsdestroy()on all streams in the pipeline. File handles get closed, buffers freed, connections torn down. -
Single completion callback: One callback for everything.
Let's see what this looks like in practice:
import { pipeline } from "stream";
import { createReadStream, createWriteStream } from "fs";
import { createGzip } from "zlib";
pipeline(
createReadStream("input.txt"),
createGzip(),
createWriteStream("output.gz"),
(err) => {
if (err) {
console.error("Compression failed:", err);
} else {
console.log("Compression succeeded");
}
}
);If any of these streams errors - the file read fails, the gzip encounters corrupt data, the file write hits a disk-full error - the callback is invoked with the error, and all three streams are destroyed. If everything succeeds, the callback is invoked with err as undefined.
This is simpler than the equivalent pipe() code with manual error handling. No separate error listeners, no manual destroy calls, no tracking which stream to listen to for completion.
There's also a promise-based version of pipeline() in the stream/promises module:
import { pipeline } from "stream/promises";
import { createReadStream, createWriteStream } from "fs";
import { createGzip } from "zlib";
try {
await pipeline(
createReadStream("input.txt"),
createGzip(),
createWriteStream("output.gz")
);
console.log("Compression succeeded");
} catch (err) {
console.error("Compression failed:", err);
}The promise-based version returns a promise that resolves when the pipeline completes or rejects when any stream errors. This fits naturally with async/await code. You wrap the pipeline() call in a try/catch, and errors are handled like any other promise rejection.
This is the recommended pattern for modern Node.js code. Use stream/promises and async/await for clean, readable pipeline composition.
How pipeline() Works Internally
Understanding the internals helps you reason about behavior and debug issues. When you call pipeline(s1, s2, s3, callback), the function essentially:
-
Connects streams using the same
pipe()mechanics you learned in earlier chapters—the same automatic backpressure handling we covered in the Writable Streams chapter -
Attaches error listeners to all streams for coordinated error handling
-
Calls
destroy()on all streams when any error occurs or when completion happens -
Invokes the callback once with either an error or undefined
The key difference from manual pipe() is the error coordination and automatic cleanup. You get the same backpressure handling (which we covered extensively in the Writable Streams chapter), but with production-grade error management.
Here's a highly simplified conceptual model showing the basic behavior. Important: This is a pedagogical simplification to illustrate the concept, not the actual implementation. The real Node.js pipeline() implementation (based on the pump library) is significantly more sophisticated and handles many edge cases not shown here - including async iterables, generators, complex error scenarios, once-only callback guarantees, and proper stream type detection:
function simplifiedPipeline(...args) {
const callback = args.pop();
const streams = args;
// Connect streams with pipe()
for (let i = 0; i < streams.length - 1; i++) {
streams[i].pipe(streams[i + 1]);
}
// Track completion
const lastStream = streams[streams.length - 1];
lastStream.on("finish", () => {
destroyAll(streams);
callback();
});
// Handle errors
for (const stream of streams) {
stream.on("error", (err) => {
destroyAll(streams);
callback(err);
});
}
}
function destroyAll(streams) {
for (const stream of streams) {
stream.destroy();
}
}This is a conceptual model to help you understand the behavior - the real pipeline() implementation in Node.js is more sophisticated with its own stream connection logic and comprehensive edge case handling (once-only callback invocation, destroyed stream handling, complex error scenarios, etc.). Think of this as "what it does" rather than "how it does it."
pipeline() handles an edge case you should know about: a stream emitting an error after it's already been destroyed. This can happen with async operations in custom streams where an error occurs after the stream has nominally completed. The real pipeline() implementation ensures the callback is only invoked once, even if multiple streams error simultaneously.
Using pipeline() with Transform Functions
You don't even need Transform stream instances - you can pass async generator functions, and pipeline() will treat them as transforms.
import { pipeline } from "stream/promises";
import { createReadStream, createWriteStream } from "fs";
await pipeline(
createReadStream("input.txt"),
async function* (source) {
for await (const chunk of source) {
yield chunk.toString().toUpperCase();
}
},
createWriteStream("output.txt")
);The async generator in the middle is automatically converted to a Transform stream. For each chunk from the source, the generator transforms it (in this case, uppercases it) and yields the result. The yielded values become chunks in the output stream.
This works well for simple transformations. Instead of creating a custom Transform class, you write a generator function inline. It reads like a loop: for each input chunk, produce an output chunk.
You can also use regular async functions that return async iterables:
async function uppercase(source) {
for await (const chunk of source) {
yield chunk.toString().toUpperCase();
}
}
await pipeline(
createReadStream("input.txt"),
uppercase,
createWriteStream("output.txt")
);This works because pipeline() recognizes async iterables and automatically wraps them in Transform streams internally.
You can even chain multiple generator transforms:
await pipeline(
createReadStream("log.txt"),
async function* (source) {
let buffer = "";
for await (const chunk of source) {
buffer += chunk.toString();
const lines = buffer.split("\n");
buffer = lines.pop();
for (const line of lines) {
yield line + "\n";
}
}
if (buffer) yield buffer;
},
async function* (source) {
for await (const line of source) {
if (!line.startsWith("#")) {
yield line;
}
}
},
createWriteStream("filtered.txt")
);The first generator converts buffers to strings and splits them into lines, handling chunk boundaries with a buffer. The second filters out lines starting with "#". Each generator is a pipeline stage, and pipeline() handles the plumbing.
This is the recommended way to build stream pipelines in modern Node.js. For simple transformations, use inline generators. For complex stateful transforms, create a Transform class. Mix and match as needed.
Important note about generator functions: Only values produced with yield are sent to the output stream. If a generator uses return to produce a final value, that value is not yielded to the pipeline - it's only accessible to code directly consuming the generator. In pipeline transforms, always use yield for output chunks.
Quick aside: if you're coming from Python, this is like itertools but actually built into the language. End digression.
Error Handling in Stream Pipelines
Error handling gets tricky with pipelines because they introduce error scenarios that don't exist in single-stream code. You already know from the Readable and Writable chapters that streams emit error events for various failures (file not found, disk full, network dropped, etc.).
In pipelines, these errors can come from multiple sources simultaneously:
- The source stream might fail to read (file doesn't exist, permission denied, network dropped)
- A transform stream might encounter invalid data (parse error, validation failure)
- The destination stream might fail to write (disk full, broken pipe, remote endpoint closed)
Each of these errors manifests as an error event on the stream that encountered it. With pipe(), you'd have to handle each separately. With pipeline(), all errors are caught and passed to your callback or promise rejection.
When an error occurs midway through a pipeline, what happens to partial data? Suppose you're reading 100MB file, transforming it, and writing the result. At 50MB, the transform encounters corrupt data and errors. What happened to the first 50MB?
The answer depends on the destination stream's behavior. If it's writing to a file, the file now contains 50MB of partial output. The file exists, but it's incomplete and possibly invalid. pipeline() doesn't roll back partial writes - it can't. The data is already written to the underlying resource.
This means you need to handle partial data in your application logic. One pattern is to write to a temporary file and rename it to the final name only on success:
import { rename, unlink } from "fs/promises";
const tempFile = "output.tmp";
const finalFile = "output.dat";
try {
await pipeline(source, transform, createWriteStream(tempFile));
await rename(tempFile, finalFile);
} catch (err) {
await unlink(tempFile); // Clean up partial file
throw err;
}If the pipeline succeeds, the temp file is renamed to the final name. If it fails, the temp file is deleted. This ensures that either the complete output exists or nothing exists - no partial files.
Another pattern is to use transactions when writing to a database. Write all rows within a transaction, and commit only if the pipeline completes successfully:
const tx = await db.beginTransaction();
try {
await pipeline(
source,
transform,
new DatabaseWriter(tx)
);
await tx.commit();
} catch (err) {
await tx.rollback();
throw err;
}pipeline() only handles stream-level cleanup - calling destroy() on streams. It doesn't handle domain-level cleanup (deleting partial files, rolling back transactions). That's your responsibility.
Error propagation is where this gets interesting. When a stream in a pipeline errors, pipeline() immediately calls destroy() on all other streams. This causes each stream to emit a close event, and any pending operations are canceled. This is correct - if one stage fails, the entire pipeline should stop.
But what if you want to handle errors from different streams differently? For example, if the source fails, you want to log "read error," but if the destination fails, you want to log "write error." With pipeline(), you only get one error in the callback - the first error that occurred.
If you need to distinguish between error sources, you can check the error object's properties or use error wrapping in your custom streams:
class SourceStream extends Readable {
_read() {
const err = new Error("Read failed");
err.code = "ERR_SOURCE_READ";
this.destroy(err);
}
}
pipeline(source, transform, destination, (err) => {
if (err && err.code === "ERR_SOURCE_READ") {
console.error("Source read error:", err);
} else if (err) {
console.error("Other error:", err);
}
});By tagging errors with a code property, you can distinguish them in the error handler.
Another pattern is using stream.finished() to detect when a specific stream completes or errors, even within a larger pipeline:
import { pipeline, finished } from "stream";
const transform = createSomeTransform();
finished(transform, (err) => {
if (err) {
console.error("Transform specifically failed:", err);
}
});
pipeline(source, transform, destination, (err) => {
if (err) {
console.error("Overall pipeline failed:", err);
}
});The finished() utility attaches listeners to a stream and invokes a callback when the stream ends, errors, or is destroyed. This lets you monitor individual streams within a pipeline.
stream.finished()
stream.finished() deserves a closer look.
The finished() function takes a stream and a callback, and invokes the callback when the stream completes (either successfully or with an error):
import { finished } from "stream";
finished(someStream, (err) => {
if (err) {
console.error("Stream errored:", err);
} else {
console.log("Stream finished successfully");
}
});What does "finished" mean? For a Readable, it means the stream has ended (pushed null or destroyed). For a Writable, it means the stream has finished writing and emitted finish (or been destroyed). For a Duplex or Transform, it means both sides have completed.
This is safer than attaching listeners to end or finish directly, because finished() also listens for error, close, and destroy events, and handles the complex logic of determining whether the stream truly completed.
There's also a promise-based version:
import { finished } from "stream/promises";
try {
await finished(someStream);
console.log("Stream finished");
} catch (err) {
console.error("Stream errored:", err);
}The promise resolves when the stream completes successfully or rejects if the stream errors.
Note on event listeners: stream.finished() intentionally leaves dangling event listeners (particularly error, end, finish, and close) after the callback is invoked or the promise settles. This design choice allows finished() to catch unexpected error events from incorrect stream implementations, preventing crashes. For most use cases with short-lived streams, this is not a concern as the streams will be garbage collected. However, for memory-sensitive applications or long-lived streams, you can use the cleanup option to remove these listeners automatically:
import { finished } from "stream/promises";
await finished(someStream, { cleanup: true }); // Removes listeners after completionWhy use finished() instead of just listening for end or finish? Because streams can end in multiple ways. A stream might emit end naturally, or it might be destroyed due to an error, or it might be destroyed explicitly via destroy(). The finished() utility handles all these cases and gives you a single callback or promise that represents "this stream is done, one way or another."
This is useful when you need to know when a specific stream in a complex pipeline has completed, even if the overall pipeline is still running. For example, if you're piping a source to multiple destinations (a tee or broadcast pattern), you can use finished() to know when each destination has consumed all its data:
const dest1 = createWriteStream("output1.txt");
const dest2 = createWriteStream("output2.txt");
source.pipe(dest1);
source.pipe(dest2);
await Promise.all([
finished(dest1),
finished(dest2),
]);
console.log("Both destinations finished");This waits for both destinations to finish writing before proceeding.
Error Recovery in Pipelines
Not all errors are fatal - some deserve a retry. Some errors are transient and can be retried. Others indicate a fundamental problem and require the pipeline to fail.
For example, if you're reading from a network source and the connection drops, that might be transient. Retrying the connection could succeed. But if you're reading a file and get EACCES (permission denied), retrying won't help - the file's permissions won't magically change.
The first step is categorizing errors. Is this an operational error (expected failure condition) or a programmer error (bug in the code)? Is it transient or permanent?
For transient errors, you can implement retry logic around the pipeline:
async function pipelineWithRetry(maxRetries) {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
await pipeline(source(), transform, destination);
return; // Success
} catch (err) {
if (isTransientError(err) && attempt < maxRetries) {
console.log(`Attempt ${attempt + 1} failed, retrying...`);
await delay(1000 * Math.pow(2, attempt)); // Exponential backoff
} else {
throw err; // Give up
}
}
}
}
function isTransientError(err) {
return err.code === "ECONNRESET" || err.code === "ETIMEDOUT"; // Network errors
}This function attempts the pipeline up to maxRetries times. If a transient error occurs, it waits (with exponential backoff) and retries. If a non-transient error occurs, or if all retries are exhausted, it throws the error.
Note that the source() is a function that creates a new source stream. You can't reuse a stream after it's errored and been destroyed. Each retry needs fresh stream instances.
Another recovery pattern is fallback. If one data source fails, try an alternative source:
try {
await pipeline(primarySource, transform, destination);
} catch (err) {
console.warn("Primary source failed, trying fallback");
await pipeline(fallbackSource, transform, destination);
}This is useful for redundant data sources, like trying a CDN first and falling back to an origin server if the CDN is unavailable.
For destination failures, you might want to retry writes to a different location:
try {
await pipeline(source, transform, primaryDest);
} catch (err) {
console.warn("Primary destination failed, trying backup");
await pipeline(source(), transform, backupDest);
}Again, note that source() is a function creating a new source. After the first pipeline fails, the original source is destroyed, so you need a new one for the retry.
The key principle: decide upfront which errors are recoverable and implement your retry or fallback logic at the pipeline level, not at the individual stream level. Streams don't know about your application's retry policy - you have to coordinate it externally.
Partial Data Concerns
Partial data needs attention because it's easy to get wrong. When a pipeline fails midway through, any data already written to the destination remains there. The pipeline doesn't automatically clean it up.
This is a problem for data integrity. If you're writing a database export and the pipeline fails at 60%, you have a 60%-complete file. If you later retry and succeed, you might end up with duplicate data (the first 60% twice) or you might overwrite the partial file with a complete one, depending on how you open the output file.
Here are strategies for handling this:
1. Write to a temporary location and atomic rename
const temp = "output.tmp";
const final = "output.dat";
try {
await pipeline(source, transform, createWriteStream(temp));
await rename(temp, final);
} catch (err) {
await unlink(temp).catch(() => {}); // Clean up, ignore errors
throw err;
}This is the safest pattern for file outputs. The final file only exists if the pipeline completed successfully.
2. Use append mode and idempotent operations
If your output supports appending (like log files), and your operations are idempotent (writing the same data twice is harmless), you can just retry from the beginning and append:
await pipeline(source, transform, createWriteStream("output.log", { flags: "a" }));If the pipeline fails and you retry, the second attempt appends more data. If you have duplicate detection downstream, this is fine.
3. Use transactional destinations
Databases, message queues, and some cloud storage systems support transactions. Write within a transaction and commit only on success:
const tx = await db.beginTransaction();
try {
await pipeline(source, transform, new DatabaseWriter(tx));
await tx.commit();
} catch (err) {
await tx.rollback();
throw err;
}The destination doesn't persist any data until the pipeline succeeds.
4. Write a completion marker
For scenarios where you can't use transactions, write a marker file indicating successful completion:
await pipeline(source, transform, createWriteStream("output.dat"));
await writeFile("output.dat.complete", "");Before processing the output file, check for the marker. If the marker doesn't exist, the file is incomplete and should be discarded or retried.
The pattern you choose depends on your destination's capabilities and your consistency requirements. The key is to be explicit about what happens on partial failure and design your pipeline to handle it.
Destroying Streams
You've already learned about stream.destroy() in the Readable and Writable chapters. As a reminder, calling destroy() on any stream transitions it to a destroyed state, emits close, and optionally emits error if you pass one.
What's specific to pipelines is that when you destroy any stream in a pipeline(), the pipeline function automatically destroys all other streams and invokes the callback with the error:
const source = createReadStream("input.txt");
const dest = createWriteStream("output.txt");
pipeline(source, dest, (err) => {
if (err) {
console.error("Pipeline stopped:", err.message);
}
});
// Later, cancel the pipeline (e.g., after user action)
setTimeout(() => {
source.destroy(new Error("Cancelled by user"));
}, 100);When you call source.destroy(), the source stream stops reading, emits close, and if you passed an error, emits error. The pipeline() function sees the error and destroys all other streams in the pipeline (in this case, dest). The callback is invoked with the error.
This automatic cleanup is another advantage of pipeline() over manual pipe() chaining. With pipe(), you'd have to track all streams and destroy them manually.
This is useful for implementing cancellation. If a user cancels an operation, you destroy the pipeline's source stream. The pipeline stops gracefully, cleans up resources, and your callback is invoked to handle the cancellation.
You can also destroy with no error:
source.destroy();In this case, the stream is destroyed but no error event is emitted. The pipeline() callback is still invoked, but err is null. This is useful for stopping a pipeline without treating it as a failure.
One important detail: destroy() is idempotent. Calling it multiple times on the same stream does nothing after the first call. The stream is destroyed once, and subsequent destroy calls are ignored.
Another thing - when you destroy a stream, buffered data is just... gone. If a Writable has buffered writes that haven't been flushed yet, they're lost. If a Readable has buffered data that hasn't been consumed yet, it's lost. Destroy means "stop immediately and throw away any state," not "gracefully finish pending operations."
If you need graceful shutdown (finish writing buffered data before closing), use end() instead of destroy():
writable.end(); // Finish writing buffered data, then closeBut end() only works on Writable streams. For Readable streams, there's no graceful stop - you either consume all data or you destroy.
Async Iteration Pipelines
In the Readable Streams chapter, we explored using for await...of to consume streams with automatic backpressure handling. This pattern is also an alternative to pipe() and pipeline() for building stream processing logic.
As a refresher: when you iterate over a Readable stream with for await...of, the iterator protocol automatically implements backpressure. The loop doesn't pull the next chunk until the current iteration completes. If your processing is async, the stream waits:
for await (const chunk of readableStream) {
await processAsync(chunk); // Stream waits for this
}We won't repeat the full backpressure mechanics here—refer to the "Backpressure in Async Iteration" section in the Readable Streams chapter for the complete explanation of how the iterator protocol manages flow control.
What's relevant to this chapter is using this pattern specifically for pipeline construction rather than pipeline(). You can build pipelines by reading from a source with for await...of, transforming each chunk, and writing to a destination:
const source = createReadStream("input.txt");
const dest = createWriteStream("output.txt");
for await (const chunk of source) {
const transformed = chunk.toString().toUpperCase();
const ok = dest.write(transformed);
if (!ok) {
await new Promise((resolve) => dest.once("drain", resolve));
}
}
dest.end();This is a manual pipeline. You're pulling from the source, transforming, and writing to the destination, with explicit backpressure handling (wait for drain if write() returns false). This pattern gives you fine-grained control over the data flow, but it requires you to manage backpressure manually—forgetting the drain check leads to unbounded memory growth, as we discussed in the Writable Streams chapter.
A cleaner pattern is to use stream.Readable.from() with an async generator:
async function* transform(source) {
for await (const chunk of source) {
yield chunk.toString().toUpperCase();
}
}
const source = createReadStream("input.txt");
const transformed = Readable.from(transform(source));
const dest = createWriteStream("output.txt");
await pipeline(transformed, dest);The async generator is automatically wrapped in a Readable stream, and pipeline() handles the plumbing. This combines the clarity of for await...of with the robustness of pipeline().
You can chain multiple generator transforms:
async function* toUppercase(source) {
for await (const chunk of source) {
yield chunk.toString().toUpperCase();
}
}
async function* filterEmpty(source) {
for await (const line of source) {
if (line.trim().length > 0) {
yield line;
}
}
}
await pipeline(
source,
toUppercase,
filterEmpty,
dest
);Each generator is a pipeline stage. This is readable. Each stage is a simple for await loop that yields transformed chunks. The composition is handled by pipeline().
This pattern is especially nice for objectMode pipelines where each chunk is a structured object:
async function* parseJSON(source) {
for await (const line of source) {
yield JSON.parse(line);
}
}
async function* extractField(source, field) {
for await (const obj of source) {
yield obj[field];
}
}
await pipeline(
source,
parseJSON,
(source) => extractField(source, "name"),
dest
);Each stage is a function from source to async iterable. The pipeline() function stitches them together. This is functional pipeline composition, and it's often clearer than creating Transform classes.
Backpressure with Async Iteration in Pipelines
We covered how backpressure works with for await...of in the Readable Streams chapter. The key points to remember when using this pattern in pipelines:
- Await async work - The iterator pulls one chunk at a time; if you await async operations, backpressure flows automatically from your processing speed back to the source
- Don't accumulate promises - Avoid
promises.push(processAsync(chunk))patterns that break backpressure by reading the entire stream into memory before processing - Use controlled concurrency - For parallel processing with bounded concurrency, use libraries like
p-limitto limit in-flight operations
From the Readable Streams chapter: you must await async operations inside the loop. Without await, you lose backpressure:
// WRONG - loses backpressure
for await (const chunk of source) {
slowOperation(chunk); // No await! Loop continues immediately
}
// CORRECT - maintains backpressure
for await (const chunk of source) {
await slowOperation(chunk); // Stream waits until this completes
}The advantage of this approach over pipeline() is explicit control over data flow. The disadvantage is that you must manage error handling yourself—there's no automatic cleanup like pipeline() provides.
For the full mechanics of how the async iterator protocol implements backpressure, see the "Backpressure in Async Iteration: How It Works" section in the Readable Streams chapter.
Composable Transforms
In the Transform Streams chapter, we covered how to implement custom transforms. Now let's look at building reusable pipeline components through factory functions.
The pattern is straightforward—create functions that return configured stream instances:
function createCSVParser() {
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
const lines = chunk.toString().split("\n");
for (const line of lines) {
if (line.trim()) {
this.push(line.split(","));
}
}
callback();
},
});
}
// Use in multiple pipelines
await pipeline(source1, createCSVParser(), dest1);
await pipeline(source2, createCSVParser(), dest2);Each call to createCSVParser() returns a fresh Transform instance. You can't reuse a stream instance across multiple pipelines (once a stream ends or errors, it's done), but you can reuse the factory function.
You can make factories configurable:
function createFieldExtractor(fields) {
return new Transform({
objectMode: true,
transform(obj, encoding, callback) {
const extracted = {};
for (const field of fields) {
extracted[field] = obj[field];
}
this.push(extracted);
callback();
},
});
}
await pipeline(
source,
createFieldExtractor(["name", "email"]),
dest
);Now the transform is parameterized. Different pipelines can extract different fields.
For more complex pipelines, you can compose pipeline segments:
function createProcessingPipeline(source, dest) {
return pipeline(
source,
createCSVParser(),
createFieldExtractor(["name", "email"]),
createValidator(),
dest
);
}
await createProcessingPipeline(source1, dest1);
await createProcessingPipeline(source2, dest2);The createProcessingPipeline() function encapsulates the entire transformation sequence. You pass in a source and destination, and it wires up all the intermediate transforms.
This is a higher-order function pattern: functions that create and compose streams. It's useful for building modular, testable streaming code.
You can also compose generator functions:
const parseCSV = async function* (source) {
for await (const chunk of source) {
const lines = chunk.toString().split("\n");
for (const line of lines) {
if (line.trim()) {
yield line.split(",");
}
}
}
};
const extractFields = (fields) =>
async function* (source) {
for await (const obj of source) {
const extracted = {};
for (const field of fields) {
extracted[field] = obj[field];
}
yield extracted;
}
};
await pipeline(
source,
parseCSV,
extractFields(["name", "email"]),
dest
);Each generator is a function from async iterable to async iterable. You compose them by passing one's output as another's input. The pipeline() function handles the plumbing.
This functional composition style fits naturally with Node.js's streaming model.
Pipeline Segments
Let's formalize the concept of a pipeline segment. A segment is a reusable piece of a pipeline - it might be a single transform, or a chain of transforms, or conditional logic that routes to different transforms.
Here's a segment that validates objects and either passes them through or routes them to an error destination:
function createValidationSegment(schema, errorDest) {
const valid = new PassThrough({ objectMode: true });
const invalid = new PassThrough({ objectMode: true });
invalid.pipe(errorDest);
return new Transform({
objectMode: true,
transform(obj, encoding, callback) {
if (schema.validate(obj)) {
this.push(obj);
} else {
invalid.write(obj);
}
callback();
},
});
}This transform validates each object. Valid objects are pushed downstream. Invalid objects are written to an error destination (like a log file or error stream). This is a branching segment: one input, two outputs.
You use it like this:
const errorLog = createWriteStream("errors.log");
await pipeline(
source,
createValidationSegment(mySchema, errorLog),
dest
);Valid objects go to dest. Invalid objects go to errorLog. The pipeline continues even when invalid objects are encountered - they're just routed elsewhere.
Another pattern is conditional segments that choose different transforms based on runtime state:
function createConditionalSegment(condition, trueTransform, falseTransform) {
return new Transform({
objectMode: true,
async transform(obj, encoding, callback) {
try {
const result = await condition(obj);
const transform = result ? trueTransform : falseTransform;
transform.write(obj);
callback();
} catch (err) {
callback(err);
}
},
});
}Based on a condition function, each object is sent through either trueTransform or falseTransform. This is a routing segment.
These patterns - branching, routing, conditional - are building blocks for complex data flows. You compose them into larger pipelines, creating sophisticated processing logic while keeping each segment focused and reusable.
Tee and Broadcast Patterns
Sometimes you need to send the same data to multiple destinations. This is called a tee (like a T-junction in plumbing) or broadcast pattern.
The simplest way to tee a stream is to pipe it to multiple destinations:
source.pipe(dest1);
source.pipe(dest2);Both dest1 and dest2 receive the same data from source. The source emits each chunk once, and both pipes forward it to their respective destinations.
But there's a catch: backpressure. If dest1 is slow and signals backpressure, the source pauses. But dest2 might be fast and ready for more data. By pausing the source, you're slowing down dest2 unnecessarily.
The source can't pause for one destination and continue for another. It's all or nothing. So when you tee a stream with pipe(), the slowest destination controls the pace for all destinations.
If this is acceptable (all destinations need the same data, and you're okay with the slowest one setting the pace), then simple piping works fine. But if you want independent backpressure per destination, you need a more sophisticated approach.
One technique is to use PassThrough streams as intermediaries:
const pass1 = new PassThrough();
const pass2 = new PassThrough();
source.on("data", (chunk) => {
pass1.write(chunk);
pass2.write(chunk);
});
source.on("end", () => {
pass1.end();
pass2.end();
});
pass1.pipe(dest1);
pass2.pipe(dest2);Now dest1 and dest2 have independent backpressure. If dest1 is slow, pass1 buffers. If dest2 is fast, pass2 doesn't buffer. The source isn't paused by either destination.
But this breaks source-level backpressure. If both destinations are slow, pass1 and pass2 both buffer, and the source isn't aware. You're buffering in memory unbounded.
For truly independent destinations with bounded memory, you need to use a fan-out stream that monitors backpressure from all destinations and pauses the source only when all destinations signal backpressure:
class FanOut extends Writable {
constructor(destinations, options) {
super(options);
this.destinations = destinations;
}
_write(chunk, encoding, callback) {
const allReady = [];
// Write to all destinations and check for backpressure
for (const dest of this.destinations) {
const canContinue = dest.write(chunk, encoding);
if (!canContinue) {
// Destination buffer is full, wait for drain
allReady.push(
new Promise((resolve) => {
dest.once('drain', resolve);
})
);
}
}
// If any destination signaled backpressure, wait for all to drain
if (allReady.length > 0) {
Promise.all(allReady)
.then(() => callback())
.catch((err) => callback(err));
} else {
// All writes succeeded without backpressure
callback();
}
}
_final(callback) {
// End all destinations when this stream ends
for (const dest of this.destinations) {
dest.end();
}
callback();
}
}This writable forwards each chunk to multiple destinations. The key difference from simple piping is how it handles backpressure: write() returns a boolean indicating whether you can continue writing. If it returns false, the destination's buffer is full, and you should wait for the drain event before writing more. This implementation collects promises for any destinations signaling backpressure and waits for all of them to drain before invoking the callback, which creates backpressure on the source.
You use it like this:
pipeline(source, new FanOut([dest1, dest2]), (err) => {
// Pipeline done
});This is a more complex pattern, and it's not common in application code. Most of the time, you either accept that the slowest destination controls the pace, or you accept unbounded buffering in PassThrough intermediaries. True fan-out with independent backpressure per destination is complex and usually only needed in specialized scenarios like logging or monitoring systems.
AbortSignal Integration
Node.js streams support AbortSignal for cancellation. You can pass a signal to the promise-based pipeline(), and if the signal is aborted, the pipeline is destroyed:
import { pipeline } from "stream/promises";
const controller = new AbortController();
try {
await pipeline(source, transform, dest, { signal: controller.signal });
} catch (err) {
if (err.name === "AbortError") {
console.log("Pipeline cancelled");
} else {
throw err;
}
}
// Note: You can also check err.code === 'ABORT_ERR' which is more robust
// since the code property is harder to accidentally modify
// To cancel: controller.abort();When you call controller.abort(), the pipeline is immediately destroyed. All streams are torn down, the promise rejects with an AbortError, and any pending operations are cancelled.
This is useful for user-initiated cancellation, timeouts, or resource cleanup in long-running operations.
You can also create a timeout signal:
const signal = AbortSignal.timeout(5000); // 5 second timeout
try {
await pipeline(source, transform, dest, { signal });
console.log("Pipeline completed");
} catch (err) {
if (err.name === "AbortError") {
console.log("Pipeline timed out");
} else {
throw err;
}
}If the pipeline doesn't complete within 5 seconds, it's automatically aborted.
For complex scenarios with multiple cancellation sources, you can use AbortSignal.any():
const userCancel = new AbortController();
const timeout = AbortSignal.timeout(10000);
const signal = AbortSignal.any([userCancel.signal, timeout]);
try {
await pipeline(source, transform, dest, { signal });
} catch (err) {
if (err.name === "AbortError") {
console.log("Cancelled by either user or timeout");
} else {
throw err;
}
}This creates a composite signal that aborts if either the user cancels or the timeout expires.
AbortSignal integration makes cancellation explicit and standardized. Instead of manually calling destroy() on streams, you abort a signal, and the pipeline handles the cleanup.
Real-World Pipeline Examples
Let's implement a few complete pipelines that demonstrate all the concepts we've covered.
1) Log File Processing
Read a large log file, parse each line as JSON, filter by log level, and write to separate output files:
import { pipeline } from "stream/promises";
import { createReadStream, createWriteStream } from "fs";
import { Transform } from "stream";
async function* parseLines(source) {
let buffer = "";
for await (const chunk of source) {
buffer += chunk.toString();
const lines = buffer.split("\n");
buffer = lines.pop();
for (const line of lines) {
if (line.trim()) {
try {
yield JSON.parse(line);
} catch (err) {
console.error("Parse error:", err);
}
}
}
}
if (buffer.trim()) {
try {
yield JSON.parse(buffer);
} catch (err) {
console.error("Parse error:", err);
}
}
}
class LevelSplitter extends Transform {
constructor(level, dest, options) {
super({ ...options, objectMode: true });
this.level = level;
this.dest = dest;
}
_transform(log, encoding, callback) {
if (log.level === this.level) {
this.dest.write(JSON.stringify(log) + "\n");
}
this.push(log);
callback();
}
}
const errorDest = createWriteStream("errors.log");
const warnDest = createWriteStream("warnings.log");
await pipeline(
createReadStream("app.log"),
parseLines,
new LevelSplitter("ERROR", errorDest),
new LevelSplitter("WARN", warnDest),
createWriteStream("all.log")
);The parseLines generator handles a common challenge in line-based stream processing: chunks don't align with line boundaries. A chunk might end mid-line, splitting {"level":"ERROR"... across two chunks. The solution uses buffer accumulation:
let buffer = "";
for await (const chunk of source) {
buffer += chunk.toString();
const lines = buffer.split("\n");
buffer = lines.pop(); // Save incomplete line for next chunk
// Process complete lines...
}When you split on \n, the last array element is either empty (if the chunk ended with a newline) or an incomplete line. Popping that element and saving it means the next chunk appends to it, completing the line. After the loop ends, any remaining buffered data gets processed—handling files that don't end with a newline.
The try/catch around JSON.parse() is critical:
try {
yield JSON.parse(line);
} catch (err) {
console.error("Parse error:", err);
}Without error handling, a single malformed JSON line crashes the entire pipeline, losing all progress. With it, the pipeline logs the error and continues. Real-world log files contain corrupted entries, so the pipeline needs to handle invalid data without stopping.
LevelSplitter both filters data to a side channel and passes all data through:
_transform(log, encoding, callback) {
if (log.level === this.level) {
this.dest.write(JSON.stringify(log) + "\n"); // Side channel
}
this.push(log); // Pass through to next stage
callback();
}Every log entry continues down the main pipeline, but ERROR-level logs are also written to errors.log. This creates a branching pipeline:
Logs → parseLines → [All logs continue]
↓
ERROR logs → errors.log
↓ [All logs continue]
WARN logs → warnings.log
↓ [All logs continue]
All logs → all.logThis approach is memory-efficient. Reading the file once and splitting in-stream uses constant memory. Two separate pipelines would double I/O and memory usage.
The { objectMode: true } option is essential because this transform receives JavaScript objects from parseLines, not buffers. When writing to the side destinations, we convert back to JSON strings with JSON.stringify(log) + "\n". Parse once, work with objects in the pipeline, serialize only when writing to disk.
The splitters chain in sequence:
parseLines → LevelSplitter("ERROR") → LevelSplitter("WARN") → all.logEach splitter calls this.push(log), passing objects through. The final destination all.log receives objects too, but since Writable streams automatically call toString() on objects, you'd want to add a final transform that serializes to JSON for proper formatting (we've simplified this for clarity, but in production you'd add async function* serializeJSON(source) { for await (const obj of source) yield JSON.stringify(obj) + "\n"; } before the final destination).
2) CSV Import with Validation
Read a CSV file, parse rows, validate, and insert into a database with batching:
async function* parseCSV(source) {
let buffer = "";
let headers = null;
for await (const chunk of source) {
buffer += chunk.toString();
const lines = buffer.split("\n");
buffer = lines.pop();
for (const line of lines) {
if (!headers) {
headers = line.split(",");
} else {
const values = line.split(",");
const row = {};
for (let i = 0; i < headers.length; i++) {
row[headers[i]] = values[i];
}
yield row;
}
}
}
}
async function* validate(source, schema) {
for await (const row of source) {
if (schema.validate(row)) {
yield row;
} else {
console.error("Invalid row:", row);
}
}
}
async function* batch(source, size) {
let batch = [];
for await (const item of source) {
batch.push(item);
if (batch.length >= size) {
yield batch;
batch = [];
}
}
if (batch.length > 0) {
yield batch;
}
}
class DatabaseWriter extends Writable {
constructor(db, options) {
super({ ...options, objectMode: true });
this.db = db;
}
async _write(batch, encoding, callback) {
try {
await this.db.insertMany(batch);
callback();
} catch (err) {
callback(err);
}
}
}
await pipeline(
createReadStream("data.csv"),
parseCSV,
(source) => validate(source, mySchema),
(source) => batch(source, 100),
new DatabaseWriter(db)
);This pipeline uses composable generator functions for data transformation and batching for database operations.
Unlike parseLines, parseCSV maintains state across chunks—it needs to remember the header row:
let buffer = "";
let headers = null; // Persists across all chunks
for await (const chunk of source) {
// ... process chunks
if (!headers) {
headers = line.split(","); // First line becomes headers
} else {
// Subsequent lines become data objects
const values = line.split(",");
const row = {};
for (let i = 0; i < headers.length; i++) {
row[headers[i]] = values[i];
}
yield row;
}
}The headers variable persists for the generator's lifetime, capturing the first line and using it to structure all subsequent rows. Raw CSV:
name,email,age
Alice,alice@example.com,30
Bob,bob@example.com,25Becomes structured objects:
{ name: "Alice", email: "alice@example.com", age: "30" }
{ name: "Bob", email: "bob@example.com", age: "25" }We don't load the entire CSV into memory—each row is processed as data flows through.
The validate generator filters without modifying data:
async function* validate(source, schema) {
for await (const row of source) {
if (schema.validate(row)) {
yield row; // Valid rows continue
} else {
console.error("Invalid row:", row); // Invalid rows logged, not yielded
}
}
}Valid rows continue downstream. Invalid rows are logged but not yielded, preventing bad data from reaching the database.
Throwing an error on invalid data would crash the pipeline and lose all progress. Real-world data is messy—logging and continuing lets you review errors after the import completes.
The batch generator is essential for database operations:
async function* batch(source, size) {
let batch = [];
for await (const item of source) {
batch.push(item);
if (batch.length >= size) {
yield batch; // Emit full batch
batch = []; // Reset for next batch
}
}
if (batch.length > 0) {
yield batch; // Don't forget partial final batch
}
}This transforms a stream of individual items into a stream of batches:
Input: item1, item2, item3, ..., item100, item101, ...
Output: [item1...item100], [item101...item200], ...Database round-trips are expensive. Batching into groups of 100 can provide a 100x speedup over inserting one row at a time. The batch size is a trade-off: too small means many round-trips and slow performance; too large means high memory usage, timeout risk, and harder error recovery. Most databases work well with batch sizes between 100-1000.
The if (batch.length > 0) after the loop handles the final partial batch. Without this check, trailing rows get silently dropped.
The DatabaseWriter writable handles async I/O:
async _write(batch, encoding, callback) {
try {
await this.db.insertMany(batch);
callback(); // Signal success
} catch (err) {
callback(err); // Signal error
}
}The _write method can be async, but you must still call the callback. Call callback() with no arguments on success, or callback(err) to propagate errors.
While await this.db.insertMany(batch) runs, the stream is paused. The next batch won't be sent until the current insert completes, preventing database overload.
The arrow functions (source) => validate(source, mySchema) let you pass additional arguments to generators:
await pipeline(
createReadStream("data.csv"),
parseCSV,
(source) => validate(source, mySchema),
(source) => batch(source, 100),
new DatabaseWriter(db)
);You're creating specialized versions of generic generators for this specific pipeline.
This entire pipeline uses roughly constant memory regardless of file size. CSV parsing keeps only the current line in memory. Validation processes one row at a time. Batching holds at most 100 rows. Writing processes only the current batch. A 10GB CSV file uses the same memory as a 10MB file.