Search NodeBook

Buy me a coffee
Streams

Transform & Duplex Streams

NodeBookOctober 29, 202532 min read
#streams#transform#duplex#data-processing#custom-streams

Readable streams produce data. Writable streams consume it. But sometimes you need both, or you need a stream where the two directions aren't related, or you need transformation between input and output.

Duplex streams are both readable and writable at the same time, with two independent sides operating in parallel. Transform streams are a specialized version of Duplex where the writable side feeds into the readable side through a transformation function. The distinction between these two types affects how you build data processing pipelines.

Both stream types work differently. Duplex streams have independent sides. Transform streams, which are more common in application code, connect the writable input to the readable output through a transformation function. We'll implement several custom Transform streams to show the patterns, then cover when to choose Duplex versus Transform.

Duplex Streams

Duplex streams come first. They're the foundation that makes Transform streams make sense. A Duplex stream is simultaneously readable and writable. You can call both read() and write() on the same object. You can attach both 'data' listeners and pass chunks to write(). The stream has all the properties and events of both Readable and Writable.

The critical detail: the readable side and the writable side are independent. Data you write to a Duplex stream doesn't automatically appear on the readable side. The two sides are separate channels that happen to exist on the same object. Think of it like a phone line - you can speak into it and listen through it, but what you say doesn't echo back to you. The two directions are independent.

This independence exists because Duplex streams model bidirectional communication channels. The canonical example is a TCP socket. When you have a socket connection, you can send data to the remote endpoint by writing to the socket, and you can receive data from the remote endpoint by reading from it. The data you send isn't the data you receive - they're two separate streams of communication happening simultaneously over the same connection.

At the class level, Duplex streams have a specific structure. The stream.Duplex class extends Readable, but it also implements the Writable interface. Internally, it maintains separate state for the readable side (_readableState) and the writable side (_writableState). When you implement a custom Duplex stream, you provide both _read() and _write() methods.

A minimal Duplex stream implementation:

import { Duplex } from "stream";

class MinimalDuplex extends Duplex {
  _read(size) {
    // produce data for readable side
    this.push("readable data");
    this.push(null);
  }

  _write(chunk, encoding, callback) {
    // consume data on writable side
    console.log("Received:", chunk.toString());
    callback();
  }
}

The _read() method is called when the readable side needs data. The _write() method is called when something writes to the writable side. These two methods don't interact. They're completely independent.

Using this stream:

const duplex = new MinimalDuplex();

duplex.on("data", (chunk) => {
  console.log("Read:", chunk.toString());
});

duplex.write("written data");
duplex.end();

When you run this, you'll see "Received: written data" from the _write() side and "Read: readable data" from the _read() side. They're not connected. You're not transforming "written data" into "readable data" - they're two separate flows.

The allowHalfOpen option is Duplex-specific and changes how the stream handles ending. When you create a Duplex stream, you can set allowHalfOpen: false to change what happens when one side ends.

By default, allowHalfOpen is true. This means the readable side can end while the writable side is still open, and vice versa. You can finish writing and call end() on the writable side, but the readable side continues to produce data. Or the readable side can push(null) to signal EOF, but you can still write to the writable side.

Network sockets work this way. When a TCP connection is half-closed, one endpoint has finished sending but can still receive. The connection isn't fully closed until both sides have finished.

If you set allowHalfOpen: false, the stream enforces that when either side ends, the other side ends too. If the readable side pushes null, the writable side is automatically ended. If you call end() on the writable side, the readable side automatically pushes null.

const duplex = new Duplex({
  allowHalfOpen: false,
  read() {
    // readable implementation
  },
  write(chunk, encoding, callback) {
    // writable implementation
    callback();
  },
});

With allowHalfOpen: false, calling duplex.end() causes the readable side to end immediately. Use this when modeling something that doesn't support half-open states, like request-response protocols where the stream should close completely when either direction finishes.

The real-world use cases for raw Duplex streams are mostly about I/O primitives. The net.Socket class from Node's networking module is a Duplex stream. When you create a TCP socket, you get a Duplex. The writable side sends data over the network. The readable side receives data from the network. The two sides are independent because you're communicating with a remote endpoint - what you send isn't what you receive.

Another example is a subprocess's stdin and stdout. When you spawn a child process, its stdin is writable (you send data to the process) and its stdout is readable (you receive data from the process). These are modeled as a Duplex stream where the two sides communicate with the external process, not with each other.

Application code rarely implements Duplex streams from scratch. Transform streams are more common for data transformation. But first, a slightly more realistic Duplex example:

This Duplex stream maintains an in-memory buffer. Data written to it is stored in an internal array, and data read from it comes from that array:

class BufferedDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.buffer = [];
  }

  _write(chunk, encoding, callback) {
    this.buffer.push(chunk);
    callback();
  }

  _read(size) {
    if (this.buffer.length > 0) {
      this.push(this.buffer.shift());
    }
  }
}

Now the two sides interact through shared state (the this.buffer array). When you write, chunks are added to the buffer. When the readable side needs data, chunks are pulled from the buffer. This is a basic queue implementation using a Duplex stream.

Even though there's shared state, the _read() and _write() methods don't call each other. They just access the same data structure. The stream's internal machinery handles calling _read() when the readable side needs data and calling _write() when something writes to the writable side.

Using a Duplex to implement a queue or buffer works, but it's not the primary use case. Most often, if you're building something that transforms or processes data in a pipeline, you want a Transform stream, not a Duplex.

One more detail about Duplex streams: error handling works differently because of the two independent sides. Because a Duplex has two independent sides, an error on one side doesn't automatically propagate to the other. If an error occurs in _write(), the stream emits an 'error' event, but the readable side continues operating unless you explicitly destroy it. Similarly, an error in _read() doesn't stop the writable side.

However, when you call destroy() on a Duplex stream, both sides are destroyed. This is the correct behavior - destroying the stream means the entire resource is being shut down, not just one direction.

duplex.destroy(new Error("Fatal error"));
// Both readable and writable sides are now destroyed

This matters when you're handling cleanup or cancellation. If you're using a Duplex to model a network connection, and the connection drops, you destroy the stream, which shuts down both sending and receiving.

Transform Streams

Transform streams are what most developers reach for when building data processors. A Transform stream is a specialized Duplex where the writable input is connected to the readable output through a transformation function. Data flows in one side, gets processed, and flows out the other side.

Unlike raw Duplex streams where the two sides are independent, Transform streams create a causal relationship between them. What you write to the writable side directly affects what comes out of the readable side. You're not just implementing two separate channels - you're implementing a function that takes input chunks and produces output chunks.

The most common examples of Transform streams in Node.js's standard library are compression and encryption. The zlib.createGzip() function returns a Transform stream. You write uncompressed data to it, and you read compressed data from it. The crypto.createCipheriv() function returns a Transform stream. You write plaintext to it, and you read ciphertext from it. The transformation happens inside the stream.

The Transform class differs from Duplex in a few key ways. Transform extends Duplex, so it has all the properties and methods of a Duplex. But instead of implementing _read() and _write(), you implement a different method: _transform().

The _transform() method has this signature:

_transform(chunk, encoding, callback)

It receives a chunk from the writable side, processes it, and pushes zero or more output chunks to the readable side. When it's done processing, it invokes the callback to signal that it's ready for the next chunk.

A simple Transform that converts text to uppercase:

import { Transform } from "stream";

class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const upper = chunk.toString().toUpperCase();
    this.push(upper);
    callback();
  }
}

The _transform() method receives a chunk (which is a Buffer by default), converts it to a string, uppercases it, pushes the result to the readable side using this.push(), and then calls the callback to indicate the transformation is complete.

Using this stream:

const upper = new UppercaseTransform();

upper.on("data", (chunk) => {
  console.log(chunk.toString());
});

upper.write("hello");
upper.write("world");
upper.end();

Output: "HELLO" and "WORLD". Each chunk you write is transformed and emerges on the readable side.

Transform's _read() method is already implemented for you, unlike Duplex. You don't override it. The Transform base class handles pulling data from an internal buffer that's populated by your _transform() method. Similarly, Transform's _write() method is implemented to call your _transform() method. You only implement the transformation logic - the stream plumbing is handled by the base class.

This makes Transform streams simpler to implement than raw Duplex streams. You focus on "what do I do with this chunk" instead of "how do I manage two independent sides."

The callback parameter in _transform() does two things. It signals that you're done processing the current chunk, and it allows you to report errors.

If an error occurs during transformation, you pass it to the callback:

_transform(chunk, encoding, callback) {
  try {
    const result = JSON.parse(chunk.toString());
    this.push(JSON.stringify(result));
    callback();
  } catch (err) {
    callback(err);
  }
}

If you pass an error to the callback, the stream emits an 'error' event and stops processing. Any buffered data is discarded, and the stream enters an errored state.

You can also use this.push() multiple times in a single _transform() call. This is called a one-to-many transformation. For every input chunk, you produce multiple output chunks.

This Transform splits input into individual lines:

class LineSplitter extends Transform {
  _transform(chunk, encoding, callback) {
    const lines = chunk.toString().split("\n");
    for (const line of lines) {
      if (line.length > 0) {
        this.push(line + "\n");
      }
    }
    callback();
  }
}

If you write "hello\nworld\n", the transform pushes two chunks: "hello\n" and "world\n". One input chunk becomes multiple output chunks.

You can also push nothing. If your transformation decides to drop a chunk (filter it out), you just call the callback without pushing:

_transform(chunk, encoding, callback) {
  const text = chunk.toString();
  if (!text.startsWith("#")) {
    this.push(chunk);
  }
  callback();
}

This transform filters out chunks that start with "#". Some chunks pass through, others are dropped.

What about many-to-one transformations, where you need to accumulate multiple input chunks before producing output? This is common when parsing structured data that might be split across chunk boundaries. You use instance state to buffer incomplete data.

This Transform accumulates chunks until it sees a delimiter, then emits the accumulated data:

class DelimiterParser extends Transform {
  constructor(delimiter, options) {
    super(options);
    this.delimiter = delimiter;
    this.buffer = "";
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const parts = this.buffer.split(this.delimiter);
    this.buffer = parts.pop(); // last part is incomplete

    for (const part of parts) {
      this.push(part);
    }
    callback();
  }
}

This transform maintains a this.buffer that accumulates incoming data. Each time _transform() is called, it appends the new chunk to the buffer, splits on the delimiter, and pushes complete parts. The last part (which might be incomplete) is kept in the buffer for the next call.

This is a fundamental pattern in Transform streams: maintaining state across calls to _transform() to handle data structures that span multiple chunks. This is stateful transformation.

The above implementation has a problem. When the stream ends, any leftover data in the buffer is lost. The stream finishes without emitting that final incomplete chunk. This is where _flush() comes in.

The _flush() Method

Transform streams have a second method you can implement: _flush(). This method is called after all input chunks have been processed (after end() is called on the writable side) but before the readable side pushes null to signal EOF. It's your opportunity to emit any remaining data.

The _flush() signature is:

_flush(callback)

It receives only a callback, no chunk. You can call this.push() to emit final data, and then you call the callback to signal that flushing is complete.

The delimiter parser with _flush() added:

class DelimiterParser extends Transform {
  constructor(delimiter, options) {
    super(options);
    this.delimiter = delimiter;
    this.buffer = "";
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const parts = this.buffer.split(this.delimiter);
    this.buffer = parts.pop();

    for (const part of parts) {
      this.push(part);
    }
    callback();
  }

  _flush(callback) {
    if (this.buffer.length > 0) {
      this.push(this.buffer);
    }
    callback();
  }
}

Now when the stream ends, _flush() is called. If there's leftover data in the buffer, it's pushed as the final chunk. Then the callback is invoked, and the stream pushes null to signal EOF on the readable side.

Without _flush(), data that doesn't end with a delimiter is lost. With _flush(), it's emitted as the final chunk. Parsers, decoders, and any Transform that accumulates state need this.

The _flush() callback works the same way as the _transform() callback. If an error occurs, you pass it to the callback:

_flush(callback) {
  if (this.buffer.length > 0) {
    try {
      const parsed = this.parseBuffer(this.buffer);
      this.push(parsed);
      callback();
    } catch (err) {
      callback(err);
    }
  } else {
    callback();
  }
}

If you pass an error to the callback, the stream emits an 'error' event instead of ending cleanly.

One more detail about _flush(): it's optional. If you don't implement it, the stream just ends without a final processing step. Transforms that don't accumulate state (like the uppercase transform) don't need this. Each chunk is independent, so there's nothing to flush when the stream ends.

But for any transform that buffers across chunks - parsers, decoders, aggregators - you must implement _flush() to avoid losing data.

A more complete example: parsing NDJSON (newline-delimited JSON) where each line is a separate JSON document.

class NDJSONParser extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.buffer = "";
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split("\n");
    this.buffer = lines.pop();

    for (const line of lines) {
      if (line.trim().length > 0) {
        try {
          const obj = JSON.parse(line);
          this.push(obj);
        } catch (err) {
          return callback(err);
        }
      }
    }
    callback();
  }

  _flush(callback) {
    if (this.buffer.trim().length > 0) {
      try {
        const obj = JSON.parse(this.buffer);
        this.push(obj);
        callback();
      } catch (err) {
        callback(err);
      }
    } else {
      callback();
    }
  }
}

This transform operates in objectMode, which means it pushes JavaScript objects instead of buffers. Each line is parsed as JSON, and the resulting object is pushed to the readable side. If a line is incomplete at the end of a chunk, it's buffered until the next chunk arrives. When the stream ends, _flush() parses any remaining buffered line.

If JSON.parse() throws, we pass the error to the callback. This stops the stream and emits an error event. We use return callback(err) to exit early - we don't want to continue processing after an error.

This pattern (buffer across chunks, split on delimiter, parse complete units, flush remaining data) appears in most Transform streams for structured data.

Implementing Custom Transform Streams

Now that you understand the mechanics, we'll implement several Transform streams. These cover filtering, mapping, splitting, joining, and stateful parsing.

Filter transforms pass through chunks that meet a condition and drop chunks that don't. This transform filters out empty lines:

class NonEmptyLines extends Transform {
  _transform(chunk, encoding, callback) {
    const text = chunk.toString();
    if (text.trim().length > 0) {
      this.push(chunk);
    }
    callback();
  }
}

Simple. If the chunk (after trimming whitespace) has content, push it. Otherwise, skip it. The callback is always invoked to signal completion, even when we don't push.

Map transforms convert each input chunk to a different output chunk, typically in objectMode. This transform takes JSON objects and extracts specific fields:

class FieldExtractor extends Transform {
  constructor(fields, options) {
    super({ ...options, objectMode: true });
    this.fields = fields;
  }

  _transform(obj, encoding, callback) {
    const extracted = {};
    for (const field of this.fields) {
      if (obj[field] !== undefined) {
        extracted[field] = obj[field];
      }
    }
    this.push(extracted);
    callback();
  }
}

Each input object is mapped to a new object with only the specified fields. One object in, one object out. This is a one-to-one transform.

Split transforms break input into smaller pieces. We've seen a line splitter, but here's a byte-level splitter that breaks a stream into fixed-size chunks:

class ChunkSplitter extends Transform {
  constructor(chunkSize, options) {
    super(options);
    this.chunkSize = chunkSize;
    this.buffer = Buffer.alloc(0);
  }

  _transform(chunk, encoding, callback) {
    this.buffer = Buffer.concat([this.buffer, chunk]);

    while (this.buffer.length >= this.chunkSize) {
      const piece = this.buffer.slice(0, this.chunkSize);
      this.buffer = this.buffer.slice(this.chunkSize);
      this.push(piece);
    }
    callback();
  }

  _flush(callback) {
    if (this.buffer.length > 0) {
      this.push(this.buffer);
    }
    callback();
  }
}

This transform accumulates incoming data in a buffer. When the buffer reaches chunkSize, it slices off a chunk and pushes it. The loop continues until there's less than chunkSize left in the buffer. When the stream ends, _flush() emits any remaining data as a final partial chunk.

Join transforms combine multiple input chunks into a single output chunk. This transform accumulates objects into an array and emits the array when a certain count is reached:

class BatchAccumulator extends Transform {
  constructor(batchSize, options) {
    super({ ...options, objectMode: true });
    this.batchSize = batchSize;
    this.batch = [];
  }

  _transform(obj, encoding, callback) {
    this.batch.push(obj);
    if (this.batch.length >= this.batchSize) {
      this.push(this.batch);
      this.batch = [];
    }
    callback();
  }

  _flush(callback) {
    if (this.batch.length > 0) {
      this.push(this.batch);
    }
    callback();
  }
}

This is a many-to-one transform. It accumulates batchSize objects, then pushes the array. If the stream ends with a partial batch, _flush() emits it.

Stateful parsing transforms maintain state across chunks to parse structured data. We've seen delimiter parsing, but here's a more complex example: a parser for length-prefixed binary messages.

In a length-prefixed protocol, each message starts with a 4-byte length field (a uint32) indicating how many bytes follow. To parse this, we need to read the length, then read that many bytes, then repeat.

class LengthPrefixedParser extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.buffer = Buffer.alloc(0);
    this.expectedLength = null;
  }

  _transform(chunk, encoding, callback) {
    this.buffer = Buffer.concat([this.buffer, chunk]);

    while (this.buffer.length >= 4) {
      if (this.expectedLength === null) {
        this.expectedLength = this.buffer.readUInt32BE(0);
        this.buffer = this.buffer.slice(4);
      }

      if (this.buffer.length >= this.expectedLength) {
        const message = this.buffer.slice(0, this.expectedLength);
        this.buffer = this.buffer.slice(this.expectedLength);
        this.expectedLength = null;
        this.push(message);
      } else {
        break;
      }
    }
    callback();
  }
}

This transform uses a state machine. The expectedLength variable tracks whether we're waiting to read a length header or waiting to read the message body. The loop reads as many complete messages as possible from the buffer, pushing each one, and then calls the callback.

There's no _flush() here. If the stream ends with incomplete data (a partial length header or a partial message body), that data is lost. Whether this is correct depends on your protocol. Some protocols treat partial data at EOF as an error. Others emit a final incomplete message or emit an error in _flush().

Most transforms you implement will be variations of these patterns.

The Chunking Boundary Problem

Data structures that span chunk boundaries cause problems in nearly every Transform implementation. This is the source of many subtle bugs in streaming code.

When you're processing a stream of bytes or text, the chunks you receive are arbitrary. The stream doesn't know or care about the structure of your data. If you're parsing JSON objects separated by newlines, a newline might appear in the middle of a chunk, or it might fall exactly on a chunk boundary, or a JSON object might be split across two chunks.

You can't assume that each chunk is a complete unit. You have to handle partial data.

We've seen this in the delimiter parser and the length-prefixed parser. Buffering solves this. You maintain an internal buffer (often a string or Buffer) that accumulates incoming data. You process complete units from the buffer and leave incomplete units for the next call.

The pattern in abstract form:

class Parser extends Transform {
  constructor(options) {
    super(options);
    this.buffer = "";
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();

    let unit;
    while ((unit = this.extractCompleteUnit(this.buffer))) {
      this.buffer = unit.remainder;
      this.push(unit.data);
    }

    callback();
  }

  _flush(callback) {
    if (this.buffer.length > 0) {
      // handle remaining data
    }
    callback();
  }

  extractCompleteUnit(buffer) {
    // return { data, remainder } or null
  }
}

The extractCompleteUnit() method tries to parse one complete unit from the buffer. If it succeeds, it returns the parsed data and the remaining buffer. If there's not enough data to parse a complete unit, it returns null. The loop continues extracting units until the buffer is empty or incomplete.

This pattern handles arbitrary chunk boundaries correctly. It doesn't matter where the chunks split - the parser accumulates data until it has a complete unit, parses it, and continues.

Concrete example: parsing CSV rows from a stream. A CSV file is lines separated by newlines, and each line is fields separated by commas. A line might be split across chunks, and a field might contain a newline if it's quoted.

A simplified CSV parser (without handling quoted fields):

class SimpleCSVParser extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.buffer = "";
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split("\n");
    this.buffer = lines.pop();

    for (const line of lines) {
      const fields = line.split(",");
      this.push(fields);
    }
    callback();
  }

  _flush(callback) {
    if (this.buffer.length > 0) {
      const fields = this.buffer.split(",");
      this.push(fields);
    }
    callback();
  }
}

This handles line boundaries correctly. If a line is split across chunks, the partial line is buffered until the newline arrives in a subsequent chunk.

But this doesn't handle quoted fields. If a field is "hello\nworld", the newline inside the quotes shouldn't split the line. Handling this correctly requires a more complex state machine that tracks whether we're inside quotes.

Proper Transform implementations must account for the possibility that data structures span chunks. Buffering and state machines are the tools you use to handle this.

Push Behavior and Backpressure in Transforms

Calling this.push() in a Transform is more subtle than it appears, because push is the interface to the readable side, and it respects backpressure.

When you call this.push(chunk), the chunk is added to the readable side's internal buffer. If the buffer is below its highWaterMark, push returns true. If the buffer is at or above highWaterMark, push returns false, signaling backpressure.

You can check this return value in _transform():

_transform(chunk, encoding, callback) {
  const transformed = this.transformData(chunk);
  const canContinue = this.push(transformed);

  if (!canContinue) {
    // readable side is full, but we have to process this chunk
  }

  callback();
}

The return value of push in _transform() doesn't usually affect your logic. You still have to call the callback. The Transform stream handles backpressure for you by not calling _transform() again until the readable side drains. You don't need to implement pause/resume logic yourself.

This is different from implementing a Readable stream, where you check push's return value and stop calling _read() if it returns false. In a Transform, the base class handles this. You just implement the transformation logic.

However, if you're pushing multiple chunks in a single _transform() call (a one-to-many transform), you might want to check push's return value and stop pushing if backpressure is signaled:

_transform(chunk, encoding, callback) {
  const parts = this.splitIntoParts(chunk);

  for (const part of parts) {
    const canContinue = this.push(part);
    if (!canContinue) {
      // readable side is full, buffer the rest
      this.bufferedParts = parts.slice(parts.indexOf(part) + 1);
      break;
    }
  }

  callback();
}

But this adds complexity. Most of the time, you just push all your output chunks and let the stream's buffering handle the backpressure. The readable side's buffer will grow until it hits highWaterMark, at which point the stream stops calling _transform() until the buffer drains.

This automatic backpressure handling makes Transform streams much easier to work with than raw Duplex streams. You don't have to coordinate the two sides manually - the base class does it for you.

PassThrough

There's a built-in Transform stream that does nothing: stream.PassThrough. It's a Transform where _transform() just pushes the input chunk unchanged.

import { PassThrough } from "stream";

const passthrough = new PassThrough();

Use cases: observing or intercepting data without modifying it.

One use case is adding event listeners. You can insert a PassThrough into a pipeline and attach 'data' listeners to it to observe the data flowing through without affecting the pipeline:

import { pipeline } from "stream/promises";

const passthrough = new PassThrough();

passthrough.on("data", (chunk) => {
  console.log("Passing through:", chunk.length, "bytes");
});

await pipeline(source, passthrough, destination);

Another use case is implementing a tee or broadcast pattern, where you split a stream to multiple destinations. You can pipe a stream to multiple PassThroughs, and each PassThrough can be piped to a different destination.

PassThrough is also useful in testing. You can create a PassThrough, write test data to it, and then read from it to verify that your stream processing logic works correctly.

Implementing PassThrough yourself:

class MyPassThrough extends Transform {
  _transform(chunk, encoding, callback) {
    callback(null, chunk);
  }
}

The callback(null, chunk) is a shorthand for pushing the chunk and then calling the callback. It's equivalent to:

this.push(chunk);
callback();

This pattern of passing the chunk to the callback is common when you want to push exactly one chunk per input chunk.

Transform vs Duplex - When to Use Each

We've covered both Duplex and Transform streams. The choice between them matters for API design.

Transform streams fit data pipelines where input chunks become output chunks. The output depends on the input. Compression, encryption, parsing, formatting, filtering, and mapping all work this way.

Duplex streams model bidirectional communication channels where the readable and writable sides are independent. Network sockets, IPC channels, proxy connections, and bidirectional message passing all work this way.

Does what you write affect what you read? If yes, use Transform. If no, use Duplex.

Application-level code mostly uses Transform streams. Duplex streams appear at the system level: networking and IPC modules that model channels rather than transformations.

Concrete examples:

A Transform that compresses data:

import { createGzip } from "zlib";

const gzip = createGzip();
input.pipe(gzip).pipe(output);

What you write to gzip (uncompressed data) directly determines what you read from it (compressed data). It's a transformation.

A Duplex that represents a TCP socket:

import { connect } from "net";

const socket = connect(3000, "localhost");
socket.write("request");
socket.on("data", (chunk) => {
  console.log("response:", chunk);
});

What you write to socket (your request) doesn't produce the data you read from socket (the server's response). They're independent. It's a channel.

There's a subtle case where you might implement a Duplex instead of a Transform: when you need to model something that has independent input and output, but the two sides share state. For example, a stream that encrypts outgoing data and decrypts incoming data using the same encryption key. The two sides are independent (encrypting A doesn't produce decrypted B), but they share configuration.

In practice, you'd probably implement this as two separate Transform streams (one for encryption, one for decryption) rather than a single Duplex, because it's cleaner and more composable. But it's technically a valid Duplex use case.

The rule of thumb: if you're building something that fits naturally into a pipeline with other transforms, make it a Transform. If you're building something that sits at the edge of your system, communicating with external entities, make it a Duplex.

Real-World Transform Examples

A few Transform streams you might actually use in production:

1) JSON Line Stringifier

This transform takes JavaScript objects and outputs newline-delimited JSON:

class JSONLineStringifier extends Transform {
  constructor(options) {
    super({ ...options, writableObjectMode: true });
  }

  _transform(obj, encoding, callback) {
    try {
      const json = JSON.stringify(obj);
      this.push(json + "\n");
      callback();
    } catch (err) {
      callback(err);
    }
  }
}

The writableObjectMode: true setting here: the writable side accepts objects, but the readable side emits strings (or buffers). You can mix modes - writable in objectMode, readable in byte mode, or vice versa.

2) Line Counter

This transform counts lines and emits a summary object at the end:

class LineCounter extends Transform {
  constructor(options) {
    super({ ...options, readableObjectMode: true });
    this.lineCount = 0;
    this.byteCount = 0;
  }

  _transform(chunk, encoding, callback) {
    this.byteCount += chunk.length;
    const lines = chunk.toString().split("\n").length - 1;
    this.lineCount += lines;
    callback();
  }

  _flush(callback) {
    this.push({
      lines: this.lineCount,
      bytes: this.byteCount,
    });
    callback();
  }
}

This transform doesn't push anything during _transform(), just accumulates statistics. In _flush(), it pushes a single summary object. This is a valid pattern - transforms don't have to produce output for every input chunk.

3) Rate Limiter

This transform delays chunks to enforce a maximum throughput:

class RateLimiter extends Transform {
  constructor(bytesPerSecond, options) {
    super(options);
    this.bytesPerSecond = bytesPerSecond;
    this.tokens = bytesPerSecond;
    this.lastRefill = Date.now();
  }

  _transform(chunk, encoding, callback) {
    this._refillTokens();

    const wait =
      Math.max(0, chunk.length - this.tokens) / this.bytesPerSecond;

    setTimeout(() => {
      this.tokens = Math.max(0, this.tokens - chunk.length);
      this.push(chunk);
      callback();
    }, wait * 1000);
  }

  _refillTokens() {
    const now = Date.now();
    const elapsed = (now - this.lastRefill) / 1000;
    this.tokens = Math.min(
      this.bytesPerSecond,
      this.tokens + elapsed * this.bytesPerSecond
    );
    this.lastRefill = now;
  }
}

This transform uses a token bucket to rate-limit throughput. If there aren't enough tokens for the current chunk, it delays the callback until enough time has passed. This is a useful pattern for throttling data flow to match downstream capacity.

4) Deduplicator

This transform in objectMode removes duplicate objects based on a key:

class Deduplicator extends Transform {
  constructor(keyField, options) {
    super({ ...options, objectMode: true });
    this.keyField = keyField;
    this.seen = new Set();
  }

  _transform(obj, encoding, callback) {
    const key = obj[this.keyField];
    if (!this.seen.has(key)) {
      this.seen.add(key);
      this.push(obj);
    }
    callback();
  }
}

This maintains a set of keys it's seen. If an object's key is new, it's pushed. Otherwise, it's dropped. This is a stateful filter transform.

These examples show the versatility of Transform streams. They can aggregate, filter, format, throttle, deduplicate, and more. Any operation that takes a stream of chunks and produces a stream of chunks is a candidate for a Transform.

Error Handling and Cleanup

Transform streams inherit error handling from both Readable and Writable. If an error occurs in _transform() or _flush(), you pass it to the callback, and the stream emits an 'error' event.

_transform(chunk, encoding, callback) {
  try {
    const result = this.process(chunk);
    this.push(result);
    callback();
  } catch (err) {
    callback(err);
  }
}

If this.process() throws, the error is caught and passed to the callback. The stream emits 'error', and processing stops.

You can also use async functions for _transform() and _flush():

async _transform(chunk, encoding, callback) {
  try {
    const result = await this.processAsync(chunk);
    this.push(result);
    callback();
  } catch (err) {
    callback(err);
  }
}

Or omit the callback and return a promise:

async _transform(chunk, encoding) {
  const result = await this.processAsync(chunk);
  this.push(result);
}

If the promise rejects, Node.js treats it as an error and invokes the callback with the rejection reason.

For cleanup, you can implement _destroy(), which is called when the stream is destroyed:

_destroy(err, callback) {
  this.cleanup();
  callback(err);
}

This is useful if your transform allocates resources (file handles, database connections, timers) that need to be released when the stream is destroyed.

Always attach an 'error' listener to Transform streams you create or use:

transform.on("error", (err) => {
  console.error("Transform error:", err);
});

Without an error listener, an error will crash your process.

ObjectMode Considerations for Transforms

We've mentioned objectMode several times. With Transform streams, you can mix modes between the writable and readable sides.

By default, both sides are in byte mode. But you can set:

  • writableObjectMode: true - writable side accepts objects, readable side emits buffers/strings
  • readableObjectMode: true - writable side accepts buffers/strings, readable side emits objects
  • objectMode: true - both sides in objectMode

Example: a Transform that parses JSON from bytes to objects:

class JSONParser extends Transform {
  constructor(options) {
    super({ ...options, readableObjectMode: true });
  }

  _transform(chunk, encoding, callback) {
    try {
      const obj = JSON.parse(chunk.toString());
      this.push(obj);
      callback();
    } catch (err) {
      callback(err);
    }
  }
}

The writable side is in byte mode (accepts buffers), the readable side is in objectMode (emits objects).

Conversely, a Transform that stringifies objects to JSON:

class JSONStringifier extends Transform {
  constructor(options) {
    super({ ...options, writableObjectMode: true });
  }

  _transform(obj, encoding, callback) {
    try {
      const json = JSON.stringify(obj);
      this.push(json);
      callback();
    } catch (err) {
      callback(err);
    }
  }
}

The writable side is in objectMode (accepts objects), the readable side is in byte mode (emits strings/buffers).

This flexibility lets you build pipelines that seamlessly transition between byte streams and object streams. You can have a byte stream that reads from a file, a Transform that parses bytes into objects, a Transform that processes objects, and another Transform that serializes objects back to bytes before writing to a destination.

Simplified Transform Creation

For simple transforms, you don't have to create a class. You can pass options with transform and flush functions inline:

import { Transform } from "stream";

const uppercase = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  },
});

This is convenient for one-off transforms. You pass a transform function and optionally a flush function in the options object. Node.js creates the Transform and calls your functions as _transform() and _flush().

You can also use stream.pipeline() with transform functions directly:

import { pipeline } from "stream/promises";

await pipeline(
  source,
  async function* (source) {
    for await (const chunk of source) {
      yield chunk.toString().toUpperCase();
    }
  },
  destination
);

This async generator becomes a Transform. Each yield pushes a chunk. This is even more concise for simple transforms and fits naturally with async iteration.

For complex stateful transforms, use a class. For simple one-off transforms in a pipeline, use inline options or a generator.

Performance Considerations

Transform streams add a layer of abstraction, which has a performance cost. Every chunk passes through the Transform's internal machinery - buffering, event emission, callback invocation. For high-throughput applications, this overhead can matter.

If you're processing millions of small chunks per second, the overhead of creating Transform instances and invoking _transform() for each chunk might be measurable. In such cases, consider batching. Instead of processing one object at a time, process arrays of objects. This reduces the number of Transform invocations.

For example, instead of:

for await (const obj of stream) {
  process(obj);
}

Batch with:

for await (const batch of batchedStream) {
  for (const obj of batch) {
    process(obj);
  }
}

The BatchAccumulator transform we implemented earlier does this.

Another performance consideration is buffer copying. If your Transform calls Buffer.concat() repeatedly to accumulate data, you're allocating and copying buffers on every chunk. For large data volumes, this is slow. Consider using a more efficient data structure, like a linked list of buffers or a BufferList from the 'bl' npm package.

For transforms that don't need to accumulate state, make sure you're not accidentally buffering. If your _transform() immediately pushes each chunk, the transform is efficient. If it accumulates chunks in an array or buffer before pushing, you're adding memory pressure and latency.

Measure performance before optimizing. Use Node.js's built-in profiler or clinic.js to identify bottlenecks. Many transforms are fast enough already, but high-throughput pipelines need attention to these details.

Testing Custom Transforms

When you implement a custom Transform, you need to test it. Here are patterns for testing transforms reliably.

Test by writing and reading:

import { Readable, Writable } from "stream";
import { pipeline } from "stream/promises";

const input = Readable.from(["hello", "world"]);
const output = [];

const collector = new Writable({
  write(chunk, encoding, callback) {
    output.push(chunk.toString());
    callback();
  },
});

await pipeline(input, myTransform, collector);

assert.deepEqual(output, ["HELLO", "WORLD"]);

You create a Readable source with known data, pipe it through your Transform, collect the output in a Writable, and assert the result.

Test edge cases: empty input, single chunks, many small chunks, large chunks, incomplete data at EOF, and invalid data. Write a test for each scenario.

Test backpressure:

Create a slow Writable destination and verify that the Transform respects backpressure:

const slow = new Writable({
  write(chunk, encoding, callback) {
    setTimeout(callback, 100);
  },
});

const start = Date.now();
await pipeline(fastSource, myTransform, slow);
const elapsed = Date.now() - start;

assert(elapsed > expectedMinimum);

If the Transform doesn't respect backpressure, it will finish much faster than expected because it's not waiting for the slow destination.