Skip to Content
Streams In DepthReadable Streams
36 min read

Readable Streams - Implementation & Internal Buffering

Now you understand why streams exist. You know they solve the problem of processing large datasets without loading everything into memory. You’ve seen the conceptual difference between push and pull models, and you know that Node.js streams blend both approaches. Now comes the practical question: how do you actually use Readable streams in your code, and more importantly, how do they work internally?

Readable streams are the entry point to streaming in Node.js. They produce data - from files, from network connections, from in-memory structures, from anywhere. Understanding how Readable streams manage their internal state, how they buffer data, and how they communicate with consumers is important to work effectively with streams in any capacity.

We’re going to build that understanding methodically. First, we’ll explore the Readable stream class itself - its options, its contract, its events. Then we’ll talk about the two operating modes and what triggers transitions between them. After that, we’ll examine internal buffering in detail, because this is where memory management happens and where highWaterMark actually matters. Finally, we’ll implement our own Readable streams and explore all the ways to consume them. By the end, you’ll have a complete mental model of how data moves from a source through a Readable stream to a consumer.

The Readable Stream Class

Let’s start with the object itself. When you import stream from Node and access stream.Readable, you’re getting a class that extends EventEmitter. This inheritance is significant. Every Readable stream is fundamentally an event emitter, which means it can emit events like data, end, error, and readable. Much of the Readable stream’s behavior is expressed through these events.

Creating a Readable stream directly is uncommon in application code. More often, you receive Readable streams from Node.js APIs like fs.createReadStream() or http.IncomingMessage. But when you do create one, either by extending the class or using new stream.Readable(options), you provide a configuration object that controls the stream’s behavior.

The most important option is highWaterMark. This is a number representing the maximum number of bytes (or objects, if you’re in objectMode) that the stream will buffer internally before it stops pulling data from the underlying source. Think of this as the stream’s memory budget for buffering. The default is 65536 bytes, which is 64 kilobytes. This default is not a random number - it represents a balance between memory usage and system call efficiency that the Node team settled on through experimentation and production usage.

Why does this matter? Because the Readable stream doesn’t just pass data directly from the source to the consumer. It maintains an internal buffer. When the consumer is ready for data, it pulls from this buffer. When the buffer runs low, the stream asks the underlying source for more data to refill it. The highWaterMark controls when the stream decides “my buffer is full enough, I should stop asking the source for more data.” If the buffer contains bytes equal to or exceeding highWaterMark, the stream will not request more data from the source until the buffer is drained below that threshold.

Let’s see what this looks like:

import { Readable } from "stream"; const readable = new Readable({ highWaterMark: 1024, // 1KB buffer });

Here we’ve created a Readable stream with a 1KB buffer threshold. If this stream is reading from a file, it will not request more than 1KB of data ahead of the consumer’s consumption rate.

Another critical option is objectMode. By default, Readable streams work with Buffer objects and strings. But sometimes you want to stream arbitrary JavaScript objects. Setting objectMode: true changes the stream’s behavior. Instead of buffering bytes, it buffers objects. Instead of highWaterMark representing a byte count, it represents an object count. In objectMode, the default highWaterMark is 16 objects, not 64KB.

const objectStream = new Readable({ objectMode: true, highWaterMark: 100, // buffer up to 100 objects });

This is useful when you’re building pipelines that process structured data. For instance, if you’re reading rows from a database and want to stream them through transform stages, objectMode makes each row a single unit in the stream, which is conceptually cleaner than converting rows to buffers and back.

The encoding option is another configurational detail. By default, Readable streams emit Buffer objects when you read from them. If you set an encoding like 'utf8', the stream automatically converts those buffers to strings using that encoding. This is purely a convenience - you can always call buffer.toString('utf8') yourself - but it can make code cleaner when you know you’re always working with text.

const textStream = new Readable({ encoding: "utf8", });

Now when this stream emits data, it will emit strings, not buffers.

These are the foundational options. There are others - read (a function to implement reading logic inline), destroy (a cleanup function), and autoDestroy (whether to automatically destroy the stream after it ends) - but highWaterMark, objectMode, and encoding are the ones you’ll configure most frequently, and they’re the ones that most significantly affect the stream’s runtime behavior.

Events

Readable streams communicate with the outside world primarily through events. Let’s check each one and understand when it fires and what it means.

The data event is the most straightforward. When a Readable stream is in flowing mode, it emits data events whenever it has data available. Each data event carries a chunk of data - either a Buffer, a string (if encoding is set), or an object (if objectMode is true).

readable.on("data", (chunk) => { console.log(`Received ${chunk.length} bytes`); });

When you attach a data event listener to a Readable stream, you are implicitly switching the stream into flowing mode. This is important. The act of listening for data changes the stream’s behavior. Data will begin flowing as soon as it’s available, pushed to your listener. You don’t have to pull. The stream pushes.

The end event fires when the stream has no more data to provide. The underlying source has been fully consumed. If you’re reading a file, end fires when you’ve reached the end of the file. If you’re reading from an HTTP response, end fires when the server has finished sending the response body. This event has no arguments. It’s just a signal: “I’m done.”

readable.on("end", () => { console.log("No more data"); });

The error event fires when something goes wrong. Maybe the file you’re reading was deleted mid-read. Maybe the network connection dropped. Maybe the underlying source threw an error for some reason. When an error occurs, the stream emits an error event with the error object. If you don’t have an error event listener attached, Node.js will throw the error, potentially crashing your app. This is why you should always attach an error handler to streams.

readable.on("error", (err) => { console.error("Stream error:", err); });

The readable event is more subtle. It fires when data is available to be read from the stream. This event is relevant primarily when the stream is in paused mode (we read bout flowing and paused modes in the previous chapter, don’t worry, we’ll clarify modes again shortly). The readable event is a signal that says “I have data in my internal buffer. If you call read(), you’ll get something.”

readable.on("readable", () => { let chunk; while ((chunk = readable.read()) !== null) { console.log(`Read ${chunk.length} bytes`); } });

Here’s what’s happening. The readable event fires. Inside the handler, we call readable.read() in a loop, pulling chunks from the internal buffer until read() returns null, which means the buffer is empty. This is a pull-based consumption pattern, as opposed to the push-based pattern of the data event.

There’s also a close event that fires when the stream and any underlying resources have been closed. This is distinct from end. The end event means “no more data,” but resources might still be open. The close event means “resources have been released.” Not all streams emit close, and in many cases you don’t need to listen for it, but it’s there if you need to know when cleanup has completed.

These events form the API surface of Readable streams. Your interactions with Readable streams, whether you’re consuming them or implementing them, will revolve around these events and their semantics.

Flowing mode vs Paused mode (recap)

Now let’s address the concept that I explained briefly in the previous chapter: operating modes. Every Readable stream is in one of two modes at any given time: flowing mode or paused mode. The mode determines how data moves from the stream’s internal buffer to your code.

In paused mode, data does not flow automatically. The stream will fill its internal buffer up to the highWaterMark, but it will not push that data to you. You must explicitly call readable.read() to pull data from the buffer. Paused mode is the default state when you create a new Readable stream.

In flowing mode, data flows automatically. As soon as data is available in the internal buffer, the stream emits data events with chunks of data. You don’t call read(). The data comes to you.

Why have two modes? Because different consumption patterns benefit from different control flows. Sometimes you want the stream to push data to you as fast as possible, and you’ll handle backpressure by pausing and resuming the stream. Other times, you want fine-grained control over when data is pulled, reading exactly when you’re ready for more. Paused mode gives you that control. Flowing mode optimizes for simplicity and throughput when you’re ready to process data as fast as it arrives.

Let’s see how you switch between modes. When a Readable stream is created, it starts in paused mode. You switch to flowing mode by doing any of the following:

  • Attaching a data event listener
  • Calling the resume() method
  • Calling the pipe() method to pipe the stream to a Writable stream

Conversely, you switch from flowing mode back to paused mode by calling the pause() method (but only if there are no pipe() destinations).

There’s a subtlety here. If you’ve piped a Readable stream to a Writable stream using pipe(), calling pause() doesn’t actually pause the stream. The pipe() mechanism manages flow control internally, and it will continue operating based on the backpressure signals from the Writable stream. This is by design - pipe() is a higher-level abstraction that handles backpressure for you, and manual pause()/resume() calls would interfere with that.

Let’s look at paused mode consumption:

const readable = getReadableStream(); readable.on("readable", () => { let chunk; while ((chunk = readable.read()) !== null) { processChunk(chunk); } }); readable.on("end", () => { console.log("Stream ended"); });

Here the stream stays in paused mode. The readable event tells us data is available. We call read() repeatedly until it returns null. We’re in control of when data is pulled.

Now let’s look at flowing mode consumption:

const readable = getReadableStream(); readable.on("data", (chunk) => { processChunk(chunk); }); readable.on("end", () => { console.log("Stream ended"); });

As soon as we attach the data listener, the stream switches to flowing mode. Data is pushed to us automatically. If processChunk() is slow, data will buffer in memory waiting to be processed, unless we implement backpressure by pausing.

Here’s how you implement backpressure in flowing mode:

readable.on("data", (chunk) => { const canContinue = processChunk(chunk); if (!canContinue) { readable.pause(); // Later, when processing catches up: // readable.resume(); } });

When processChunk() indicates it can’t keep up, we pause the stream. This stops the flow of data events. Later, when processing catches up (perhaps in a callback or a resolved promise), we call resume() to restart the flow.

There’s a third, less common way to consume a stream: the read(size) method in paused mode without a readable listener. You can call readable.read(size) directly at any time to pull a specific number of bytes from the internal buffer. If the buffer doesn’t have that many bytes, read() returns whatever is available, or null if the buffer is empty.

const chunk = readable.read(100); if (chunk !== null) { console.log(`Read ${chunk.length} bytes`); }

This gives you precise control over how much data you pull at a time, which can be useful when implementing protocols with fixed-size headers or structures.

The key is that these modes reflect different strategies for managing memory and concurrency. Paused mode gives you control and makes backpressure explicit. Flowing mode gives you simplicity and performance when your processing can keep up with the data rate. Understanding when and how to use each mode is part of mastering streams.

Internal Buffering

Let’s dig further into what’s really happening inside a Readable stream. When you create a Readable stream and start reading from it, data doesn’t teleport directly from the underlying source (a file, a socket, a generator) to your consumption code. It passes through an internal buffer maintained by the stream.

The internal buffer is a queue of chunks. When the stream pulls data from the underlying source, those chunks are added to the buffer. When you consume data from the stream (either by calling read() in paused mode or by receiving data events in flowing mode), chunks are removed from the buffer. The buffer grows when the source is producing faster than the consumer is consuming, and it shrinks when the consumer catches up.

The buffer is not a single Buffer object. It’s actually an array of chunks (earlier it was a linked list, but this was changed for better performance). Each chunk remains in its original allocated Buffer, and the array just tracks the sequence. While arrays require occasional resizing, JavaScript’s array implementation handles this efficiently, and the benefits of better cache locality and simpler iteration typically outweighs the occasional reallocation cost.

You can inspect the current state of the buffer using the _readableState property. This property is technically internal (the underscore prefix signals that), but it’s useful for debugging and understanding what’s happening.

const state = readable._readableState; console.log(`Buffer length: ${state.length} bytes`); console.log(`Buffer count: ${state.buffer.length} chunks`); console.log(`highWaterMark: ${state.highWaterMark} bytes`);

The state.length tells you how many bytes are currently buffered. The state.buffer is the array itself, and state.buffer.length tells you how many chunks are in the array. The state.highWaterMark is the threshold we configured or the default 64KB if not configured.

Now, here’s the critical mechanism. When the buffer’s total length is below the highWaterMark and the stream needs more data (either because a consumer is reading or because the stream is in flowing mode), the stream calls an internal method called _read(). This method is responsible for fetching more data from the underlying source and pushing it into the buffer. If you’re implementing a custom Readable stream, you provide the _read() implementation. If you’re using a built-in stream like fs.createReadStream(), the Node internals provide the _read() implementation.

When _read() is called, it’s being told: “The buffer has space. Please fetch more data.” The implementation of _read() should fetch data from the source and push it into the buffer using the push() method. Here’s a simplified version:

class MyReadable extends Readable { _read(size) { const chunk = this.getDataFromSomeTypeOfSource(size); if (chunk) { this.push(chunk); // Adds to internal buffer } else { this.push(null); // Signals end of data } } }

The _read(size) method receives a size hint - typically the highWaterMark value - suggesting how much data it should fetch. This is only a hint. You’re allowed to push more or less. The stream will adapt. But respecting the hint helps optimize I/O efficiency.

When you call this.push(chunk), several things happen. First, the chunk is added to the internal buffer. Second, if the stream is in flowing mode, the chunk may be immediately emitted as a data event (bypassing the buffer entirely if there’s a consumer ready). Third, if the stream is in paused mode, a readable event may be emitted to signal that data is available.

Importantly, push() returns a boolean. If push() returns false, it means the buffer has reached or exceeded the highWaterMark (specifically, when state.length >= state.highWaterMark), and the stream is requesting that the source stop producing data. In response, your _read() implementation should stop fetching data from the source. The stream will call _read() again later when the buffer drains back below the highWaterMark.

Here’s a more complete example:

class FileReader extends Readable { constructor(fd, options) { super(options); this.fd = fd; this.reading = false; } _read(size) { if (this.reading) return; this.reading = true; const buffer = Buffer.allocUnsafe(size); fs.read(this.fd, buffer, 0, size, null, (err, bytesRead) => { this.reading = false; if (err) { this.destroy(err); } else if (bytesRead === 0) { this.push(null); // EOF } else { const shouldContinue = this.push(buffer.slice(0, bytesRead)); // If push returns false, backpressure is applied // No need to read more - _read will be called again when ready } }); } }

This is a simplified file reader. When _read() is called, it reads size bytes from the file descriptor and pushes them into the stream. If fs.read() returns zero bytes, we’ve reached the end of the file, so we push null to signal EOF. If an error occurs, we destroy the stream with the error.

Notice we track a reading flag to prevent overlapping _read() calls. Since fs.read() is asynchronous, without this flag, multiple _read() calls could be in flight simultaneously if called rapidly. We also capture the return value of push() - if it returns false, the buffer is full and backpressure is applied. We don’t need to take explicit action here because _read() won’t be called again until the buffer drains below the highWaterMark. This pattern ensures we respect backpressure even with asynchronous data sources.

The buffer’s behavior also differs between objectMode and byte mode. In byte mode, the buffer tracks total bytes buffered and compares it against a byte-based highWaterMark. In objectMode, the buffer tracks the number of objects buffered and compares it against an object-count-based highWaterMark. Internally, the same structure is used, but the accounting changes.

One more detail. The stream doesn’t just drain the buffer when you call read() or when data events fire. There’s also a concept of a “reading state” tracked internally. If the stream is actively reading (meaning _read() has been called and hasn’t yet pushed new data or pushed null), the stream won’t call _read() again until the current read completes. This prevents redundant reads and keeps the source from being overwhelmed by concurrent read requests.

All of this buffering machinery exists to smooth out mismatches between the source’s data rate and the consumer’s consumption rate. If the source produces data in bursts (for example, reading from a network socket that receives data in packets), the buffer accumulates those bursts so the consumer sees a steady stream. If the consumer occasionally pauses (for example, waiting for a database write to complete), the buffer holds data until the consumer is ready again. The highWaterMark controls the size of this buffer, which directly controls the trade-off between memory usage and throughput.

Implementing Custom Readable Streams

Now that we understand the internals, let’s implement our own Readable streams. This is less common than consuming streams, but it’s very very important for building libraries, creating custom data sources, or deeply understanding stream behavior.

The standard approach is to extend the Readable class and implement the _read() method. Let’s start with a simple example: a stream that emits numbers from 1 to N.

import { Readable } from "stream"; class CounterStream extends Readable { constructor(max, options) { super(options); this.max = max; this.current = 1; } _read() { if (this.current <= this.max) { this.push(String(this.current)); this.current++; } else { this.push(null); } } }

This stream pushes each number as a string. When the counter exceeds max, it pushes null to signal the end. Notice we don’t check push()’s return value. Since we’re producing data synchronously and the stream is calling _read() when it needs more data, the flow control is already handled by the stream’s internal logic. If the buffer fills up, the stream won’t call _read() again until it drains.

Let’s consume this stream:

const counter = new CounterStream(5); counter.on("data", (chunk) => { console.log(`Received: ${chunk}`); }); counter.on("end", () => { console.log("Counter ended"); });

Output:

Received: 1 Received: 2 Received: 3 Received: 4 Received: 5 Counter ended

Now let’s implement something more realistic: a stream that reads lines from a text file. This is a common pattern when processing large log files or CSV files.

Note

Don’t worry if you do not understand the code related to the ‘fs’ API below. We’re going to dive deep into files in the next chapter.

import { Readable } from "stream"; import fs from "fs"; class LineStream extends Readable { constructor(filePath, options) { super(options); this.fd = fs.openSync(filePath, "r"); this.buffer = ""; this.position = 0; } _read() { const chunk = Buffer.alloc(1024); const bytesRead = fs.readSync(this.fd, chunk, 0, 1024, this.position); if (bytesRead === 0) { if (this.buffer.length > 0) { this.push(this.buffer); } this.push(null); return; } this.position += bytesRead; this.buffer += chunk.slice(0, bytesRead).toString(); let lineEnd; while ((lineEnd = this.buffer.indexOf("\n")) !== -1) { const line = this.buffer.slice(0, lineEnd); this.buffer = this.buffer.slice(lineEnd + 1); if (!this.push(line)) { return; } } } _destroy(err, callback) { if (this.fd !== undefined) { fs.close(this.fd, callback); } else { callback(err); } } }

This stream reads chunks from a file, accumulates them in an internal string buffer, and pushes complete lines to the stream. When it encounters a newline, it pushes the line (without the newline) and continues. If there’s leftover data in the buffer when the file ends, it pushes that as the final line.

Notice the _destroy() method. This is a cleanup hook that’s called when the stream is destroyed. By default, Readable streams have autoDestroy: true, which means _destroy() will be called automatically after the stream ends (after push(null)). We use it to close the file descriptor, ensuring we don’t leak file handles. We check if this.fd is defined before closing to handle edge cases safely.

Also notice that inside the while loop, we check the return value of push(). If push() returns false, we return early from _read(), stopping further pushes. This respects backpressure. If the consumer pauses or the buffer fills up, we won’t push more lines until _read() is called again.

As we saw in the previous chapter, there’s a simpler way to create Readable streams for many use cases: stream.Readable.from(). This utility function creates a Readable stream from an iterable or async iterable. If you have an array, a generator, or an async generator, you can turn it into a Readable stream with one line.

import { Readable } from "stream"; async function* generateNumbers() { for (let i = 1; i <= 5; i++) { await new Promise((resolve) => setTimeout(resolve, 100)); yield i; } } const stream = Readable.from(generateNumbers()); stream.on("data", (num) => { console.log(`Received: ${num}`); });

This is incredibly convenient. The Readable.from() method handles all the heavy-lifting. It calls the async generator’s next() method, waits for the promise to resolve, pushes the value into the stream, and repeats until the generator is done. If you’re building a Readable stream from structured data or implementing a simple custom data source, Readable.from() can eliminate the need to extend Readable manually.

One more consideration when implementing Readable streams is handling errors. If an error occurs while fetching data from the source, you should destroy the stream with that error. This stops the stream, emits an error event, and cleans up resources.

_read() { this.fetchData((err, data) => { if (err) { this.destroy(err); // Emits 'error' event } else if (data === null) { this.push(null); // End of stream } else { this.push(data); } }); }

Calling this.destroy(err) transitions the stream to a destroyed state. No more _read() calls will be made, and the error event will be emitted with the error object. If you’ve implemented _destroy(), it will be called to clean up resources.

Consuming Patterns

We’ve seen bits and pieces of consumption throughout this chapter. Now let’s systematically cover all the ways to consume Readable streams, with tips on when to use each approach.

Event-based consumption (flowing mode) is the most straightforward. Attach data and end listeners, and the stream pushes data to you.

readable.on("data", (chunk) => { processChunk(chunk); }); readable.on("end", () => { console.log("Done"); }); readable.on("error", (err) => { console.error("Error:", err); });

This pattern is simple and performant when your processing is fast. However, if processChunk() is slow or asynchronous, you need to implement backpressure manually by calling pause() and resume(), which adds complexity.

Async iteration consumption is the most modern, straight-forward and ergonomic approach. Readable streams are async iterables, so you can consume them with for await...of.

try { for await (const chunk of readable) { await processChunk(chunk); } console.log("Done"); } catch (err) { console.error("Error:", err); }

This pattern handles backpressure automatically. If processChunk() returns a promise, the loop waits for it to resolve before pulling the next chunk. This means the stream won’t push more data until you’re ready. It’s clean, easy to reason about, and recommended for most use cases.

Explicit read() consumption (paused mode) gives you fine-grained control. You call read() when you’re ready for data.

readable.on("readable", () => { let chunk; while ((chunk = readable.read()) !== null) { processChunk(chunk); } }); readable.on("end", () => { console.log("Done"); });

You can also call read(size) to pull a specific number of bytes, which is useful for parsing binary protocols where you need to read fixed-size headers or structures.

const header = readable.read(4); if (header !== null) { const bodyLength = header.readUInt32BE(0); const body = readable.read(bodyLength); if (body !== null) { processMessage(header, body); } }

This pattern is powerful but verbose and error-prone. You have to manage the state machine yourself, handling cases where not enough data is available yet.

Using pipe() connects a Readable stream to a Writable stream, handling backpressure automatically. Don’t worry we have a dedicated sub-chapter on pipes and writable streams.

readable.pipe(writable); readable.on("error", (err) => { console.error("Read error:", err); }); writable.on("error", (err) => { console.error("Write error:", err); });

The pipe() method listens for data events on the Readable stream and calls write() on the Writable stream. If write() returns false (signaling that the Writable stream’s buffer is full), pipe() calls pause() on the Readable stream. When the Writable stream emits a drain event (signaling that its buffer has space again), pipe() calls resume() on the Readable stream. This automatic backpressure handling is why pipe() is so convenient.

However, pipe() has limitations. Error handling is awkward as errors don’t propagate automatically, so you must attach error listeners to both streams. Also, if an error occurs in the middle of piping, cleanup can be tricky. The streams might not be properly closed or destroyed.

Using stream.pipeline() is the modern, robust alternative to pipe(). It connects multiple streams in a pipeline and handles errors and cleanup automatically.

import { pipeline } from "stream/promises"; try { await pipeline(readable, writable); console.log("Pipeline succeeded"); } catch (err) { console.error("Pipeline failed:", err); }

The pipeline() function from stream/promises returns a promise that resolves when the pipeline completes successfully or rejects if any stream emits an error. When an error occurs, pipeline() automatically destroys all streams in the pipeline, ensuring resources are cleaned up. This makes it the recommended way to compose streams in production code.

You can also pass transform functions to pipeline(), which we’ll explore in later chapters when we cover Transform streams.

Each consumption pattern has its place. For simple, fast processing, event-based consumption is fine. For async processing with clean backpressure handling, async iteration is ideal. For binary protocol parsing or fine-grained control, explicit read() is necessary. For piping streams together, pipeline() is the safest and most robust choice.

Mode Transitions and State Management

We’ve discussed flowing and paused modes, but let’s clarify exactly when transitions happen and what the internal state looks like. This can lead to bugs where data is lost or backpressure is not respected.

When a Readable stream is created, it’s in paused mode, and its internal state has a flag state.flowing set to null. This is neither paused nor flowing - it’s an initial state where the stream hasn’t started yet.

The first time you attach a data listener, state.flowing becomes true, and the stream switches to flowing mode. Data begins flowing immediately if the internal buffer has data, or as soon as data becomes available.

If you call pause(), state.flowing becomes false. The stream stops emitting data events. However, the internal buffer continues to fill up to the highWaterMark. The source keeps producing data until the buffer is full, at which point _read() stops being called.

If you call resume(), state.flowing becomes true again. The stream starts emitting data events from the buffer, and if the buffer drains below the highWaterMark, _read() is called to fetch more data from the source.

If you remove all data listeners (and the stream is not piped anywhere), state.flowing remains true. This is a slight nuance: the stream stays in flowing mode structurally, but with no listeners attached, data events have nowhere to go. The stream will continue to drain its buffer and call _read(), but the emitted data effectively disappears. If you attach a new data listener later, the stream will immediately start emitting events to it (you won’t receive data that was already emitted while no listener was attached). To actually pause the stream and stop it from processing data, you must explicitly call pause(), which sets state.flowing to false.

This distinction matters when you’re dynamically adding and removing listeners. Simply removing a data listener doesn’t pause the stream - it just removes the destination for the events. The stream continues consuming from its source. If you want to temporarily stop data processing (perhaps to apply backpressure or wait for some condition), you need to explicitly pause() the stream.

Paused mode (false) requires an explicit pause() call or occurs when the stream is piped and the destination applies backpressure.

You may ask, why? Because if you’re dynamically adding and removing listeners, or if you’re building middleware that wraps streams, you need to understand these state transitions to avoid accidentally losing data or failing to control when the stream processes data.

Here’s a snippet to observe these transitions:

const readable = getReadableStream(); console.log(`Initial flowing: ${readable.readableFlowing}`); // null readable.on("data", (chunk) => { console.log(`Received ${chunk.length} bytes`); }); console.log(`After data listener: ${readable.readableFlowing}`); // true readable.removeAllListeners("data"); console.log(`After removing listeners: ${readable.readableFlowing}`); // true (still!) readable.pause(); console.log(`After pause: ${readable.readableFlowing}`); // false readable.resume(); console.log(`After resume: ${readable.readableFlowing}`); // true

The readableFlowing property is public and safe to read. It reflects the current state: null, false, or true.

There’s another state consideration: the ended flag. Once the stream has emitted end, no more data will be emitted. If you try to read from an ended stream, read() will return null. The stream remains in this ended state until it’s destroyed. Even if new data somehow becomes available (which shouldn’t happen in well-behaved streams), an ended stream will not emit it.

The destroyed flag is also tracked. Once a stream is destroyed, it will not emit any more events (except close), and attempts to read or write will fail. The stream’s resources are released, and it’s effectively dead.

Understanding these state flags and transitions helps debug issues like “why isn’t my stream emitting data?” or “why is my stream stuck?” Often, the stream is in a state you didn’t expect - paused when you thought it was flowing, ended when you thought there was more data, or destroyed when you thought it was still active.

Backpressure in Practice

Let’s make backpressure concrete. We’ve talked about it abstractly, but let’s see what it looks like in real code and what happens if you ignore it.

Suppose you’re reading a large file and processing each chunk by making an HTTP request to an API. Each request takes 100ms. Here’s naive code that ignores backpressure:

const readable = fs.createReadStream("large-file.txt"); readable.on("data", async (chunk) => { await fetch("https://api.example.com/process", { method: "POST", body: chunk, }); }); readable.on("end", () => { console.log("Done"); });

What happens here? The stream emits data events as fast as it can read from the file. Each event fires the async handler, which initiates an HTTP request. But the handler doesn’t block the stream. The stream keeps emitting data events, creating more and more concurrent HTTP requests. If the file is large and the chunks are small, you could end up with thousands of in-flight requests, exhausting memory and network resources.

This is a backpressure failure. The consumer (the HTTP request logic) is slower than the producer (the file read), but there’s no mechanism to slow down the producer.

Here’s how you fix it with explicit pause and resume:

const readable = fs.createReadStream("large-file.txt"); readable.on("data", async (chunk) => { readable.pause(); await fetch("https://thenodebook.com/process", { method: "POST", body: chunk, }); readable.resume(); }); readable.on("end", () => { console.log("Done"); });

Now, as soon as a data event is emitted, we pause the stream. This stops further data events. We process the chunk asynchronously. Once the HTTP request completes, we resume the stream, allowing the next data event to fire. This serializes the processing, ensuring only one request is in flight at a time. The file read rate matches the HTTP request rate.

But honestly, this pattern is awkward and bad. The pause/resume calls clutter the logic, and if an error occurs, you might forget to resume, leaving the stream stuck.

The cleaner solution is async iteration:

const readable = fs.createReadStream("large-file.txt"); for await (const chunk of readable) { await fetch("https://thenodebook.com/process", { method: "POST", body: chunk, }); } console.log("Done");

This achieves the same backpressure behavior automatically. The for await...of loop doesn’t pull the next chunk until the current iteration completes. If the fetch() takes time, the stream waits. The producer’s rate matches the consumer’s rate, with no explicit pause or resume calls.

Now, what if you want controlled concurrency - say, up to 5 requests in flight at once? Async iteration alone doesn’t give you that. You’d need to implement a concurrency limiter, which is beyond the scope of this chapter but is a common pattern in production and I am pretty sure you’re smart enough to implement it yourself. If not, don’t worry, libraries like p-limit or async provide utilities for this.

The key takeaway is that backpressure is not automatic unless you use mechanisms that enforce it. Event-based consumption with data listeners does not enforce backpressure by default. You must add it manually with pause/resume. Async iteration enforces backpressure by design. Piping with pipe() or pipeline() enforces backpressure automatically by monitoring the Writable stream’s state. Choose your consumption pattern based on whether automatic backpressure handling is important to you.

Reading in Object Mode

Object mode changes the semantics of Readable streams slightly. Instead of pushing Buffer objects or strings, you push arbitrary JavaScript values. Instead of highWaterMark being a byte count, it’s an object count. Let’s see what this looks like in practice.

Here’s a Readable stream that emits database rows:

import { Readable } from "stream"; class RowStream extends Readable { constructor(db, query, options) { super({ ...options, objectMode: true }); this.db = db; this.query = query; this.offset = 0; this.limit = 100; } async _read() { try { const rows = await this.db.query(this.query, { offset: this.offset, limit: this.limit, }); if (rows.length === 0) { this.push(null); // No more rows } else { for (const row of rows) { if (!this.push(row)) { break; } } this.offset += rows.length; } } catch (err) { this.destroy(err); } } }

This stream queries a database in batches of 100 rows. Each row is pushed as a JavaScript object. The consumer sees a stream of row objects:

const stream = new RowStream(db, "SELECT * FROM users"); for await (const row of stream) { console.log(`User: ${row.name}, Email: ${row.email}`); }

Object mode is powerful when you’re working with structured data that doesn’t naturally map to bytes. Instead of serializing rows to JSON, pushing the JSON as a Buffer, and then parsing it back in the consumer, you just push the objects directly. This is more efficient and cleaner.

Important

Since highWaterMark is an object count, not a byte count, the stream’s memory usage depends entirely on the size of the objects you push. If you push 16 objects and each is 10MB, you’re buffering 160MB, even though the highWaterMark is just 16. Node.js has no way to measure the byte size of arbitrary JavaScript objects, so it relies on the object count. This means you, as the programmer, must carefully calculate appropriate highWaterMark values based on expected object sizes.

Edge Cases and Debugging

Readable streams have several edge cases that can trip you up if you’re not aware of them. I’m going to cover a few.

Empty streams. A Readable stream can end immediately without pushing any data. If you call push(null) in _read() without ever pushing actual data, the stream will emit end without ever emitting data. This is valid and sometimes intentional (for example, reading an empty file), but it can surprise consumers who expect at least one data event.

Unhandled error events. If a Readable stream emits an error event and there’s no listener, Node.js will throw the error, potentially crashing your app/process. Always attach an error listener, even if it just logs the error.

Destroyed streams. Once a stream is destroyed, you cannot read from it anymore. If you destroy a stream prematurely (for example, calling destroy() while there’s still buffered data), that data is lost. If you need to clean up resources but still want to emit buffered data, call push(null) to signal the end gracefully, rather than destroying immediately.

Mixing consumption patterns. If you attach both a readable listener and a data listener to the same stream, the behavior can be confusing. Attaching a readable listener prevents the stream from entering flowing mode, even if a data listener is attached afterward. The readable listener takes precedence, and data events won’t fire as expected. Stick to one consumption pattern per stream.

Misunderstanding read(size). Calling read(size) does not guarantee you’ll get exactly size bytes. You’ll get up to size bytes, depending on what’s available in the buffer. If the buffer has fewer than size bytes, you get what’s there. If the buffer is empty, you get null. Don’t assume read(size) blocks or waits - it’s non-blocking and returns immediately.

Ignoring readable.destroyed. If you’re implementing custom logic that interacts with streams, always check readable.destroyed before calling methods like read() or push(). Operating on a destroyed stream can lead to errors or unexpected behavior.

For debugging, the _readableState property is invaluable. It exposes the internal state of the stream:

console.log(readable._readableState);

This logs an object with properties like buffer, length, highWaterMark, flowing, ended, endEmitted, reading, destroyed, and more. If a stream is misbehaving, inspecting this state can reveal what’s going wrong.

You can also enable debug logging for streams by setting the NODE_DEBUG environment variable:

NODE_DEBUG=stream node your-script.js

This logs detailed internal stream events to stderr, showing when _read() is called, when data is pushed, when events are emitted, and more. It’s verbose, but it’s useful for understanding the exact sequence of operations.

Memory Implications and Choosing highWaterMark

We’ve mentioned highWaterMark repeatedly, but let’s discuss how to choose a good value. The default of 64KB is a reasonable compromise for most use cases, but it’s not optimal for all scenarios.

If you’re streaming large files where the source and consumer are roughly balanced in speed, increasing the highWaterMark can improve throughput. A larger buffer means fewer system calls to read from the source. For example, if you’re reading a file from a fast SSD, increasing highWaterMark to 128KB or 256KB can reduce the overhead of repeated fs.read() calls.

const readable = fs.createReadStream("large-file.bin", { highWaterMark: 128 * 1024, // 128KB });

However, larger buffers mean more memory usage. If you’re processing thousands of streams concurrently (for example, in a high-traffic web server where each request reads from a file), the cumulative memory usage of all those buffers can be significant. In such cases, you might want to decrease the highWaterMark to reduce per-stream memory footprint.

const readable = fs.createReadStream("file.txt", { highWaterMark: 4 * 1024, // 4KB });

There’s also a latency consideration. If you’re streaming real-time data (for example, a live video feed), a smaller highWaterMark means lower latency. The consumer gets data sooner because the stream doesn’t wait to fill a large buffer before pushing data. Conversely, if latency is not a concern and throughput is paramount, a larger highWaterMark reduces overhead.

In objectMode, the trade-off is similar, but the units are objects, not bytes. If each object is small, a highWaterMark of 16 might be too low, causing frequent _read() calls and reducing throughput. If each object is large, 16 might be too high, consuming too much memory.

There’s no one-size-fits-all answer. Profile your application. Measure memory usage and throughput with different highWaterMark values. The default is usually fine, but for performance-critical applications, tuning can make a difference.

Readable.from() and Async Iterables

Before we close this chapter, let’s revisit Readable.from() and understand why it’s so powerful. This utility bridges the gap between async iterables (generators, arrays, async generators) and Readable streams, allowing you to leverage the stream API without manually implementing _read().

Suppose you have an async generator that fetches paginated data from an API:

async function* fetchPages(url) { let page = 1; while (true) { const response = await fetch(`${url}?page=${page}`); const data = await response.json(); if (data.items.length === 0) break; for (const item of data.items) { yield item; } page++; } }

This generator fetches pages of items and yields each item. You can turn it into a Readable stream:

const stream = Readable.from(fetchPages("https://api.example.com/items")); stream.pipe(someWritable);

The Readable.from() method handles all the heavy-lifting. It calls the generator’s next() method, waits for the promise to resolve, pushes the yielded value into the stream, and repeats. If the generator throws an error, the stream emits an error event. If the generator completes, the stream pushes null to signal the end.

This pattern is incredibly composable. If you have a function that returns an async iterable, you can plug it into the stream ecosystem with a single line. You get all the benefits of streams - backpressure, event-based consumption, piping to Writable streams, compatibility with pipeline() - without writing any stream-specific code.

You can also pass regular iterables, not just async ones:

const stream = Readable.from([1, 2, 3, 4, 5]); for await (const num of stream) { console.log(num); }

This creates a Readable stream from an array. Each array element becomes a chunk in the stream. It’s a simple way to convert synchronous data into a stream for testing or integration with stream-based APIs.

The lesson here is that Readable streams are not just about reading files or sockets. They’re a general abstraction for producing sequences of values, whether those values come from I/O, computation, or iteration over data structures. Readable.from() makes that abstraction accessible to any code that produces an iterable or async iterable, lowering the barrier to adopting streams in your codebase.

Wrapping Up

Aaaah… finally! We’ve covered a lot of ground. Let’s do a quick recap on what we’ve covered.

A Readable stream is a producer of data. It pulls data from an underlying source (a file, a socket, a generator, a database query) and makes that data available to consumers. The stream maintains an internal buffer - an array of chunks - that sits between the source and the consumer, smoothing out rate mismatches and providing backpressure.

The stream operates in one of two modes: paused or flowing. In paused mode, data must be explicitly pulled using read(). In flowing mode, data is automatically pushed via data events. You control the mode through event listeners and method calls like pause(), resume(), and pipe().

The highWaterMark option controls the buffer size threshold. When the buffer’s length exceeds highWaterMark, the stream stops calling _read(), applying backpressure to the source. When the buffer drains below highWaterMark, the stream calls _read() again, requesting more data.

To implement a custom Readable stream, you extend the Readable class and implement _read(). Inside _read(), you fetch data from your source and call push(chunk) to add it to the buffer. When there’s no more data, you call push(null) to signal the end. If an error occurs, you call destroy(err).

To consume a Readable stream, you have several options: event-based consumption with data and end listeners, async iteration with for await...of, explicit read() calls in paused mode, piping with pipe(), or robust pipelines with stream.pipeline(). Each pattern has trade-offs in terms of simplicity, backpressure handling, and control.

Readable streams emit events to communicate state changes: data for chunks, end for completion, error for errors, readable for data availability, and close for resource cleanup. Your code reacts to these events to process data and handle edge cases.

Backpressure is critical to bounded memory usage. If your consumer is slower than the producer, you must either use a consumption pattern that enforces backpressure automatically (async iteration, pipeline()), or implement it manually (pause/resume).

Object mode changes the stream’s semantics from byte-based to object-based, allowing you to push arbitrary JavaScript values and treating highWaterMark as an object count rather than a byte count.

This mental model - source, buffer, highWaterMark, modes, events, backpressure - is the base for working with all streams in Node.js, not just Readable streams. Writable, Transform, and Duplex streams build on these same concepts, adding their own specific behaviors and contracts. Understanding Readable streams deeply means you’ve mastered half of the streaming paradigm. The other half - writing data, transforming data, and composing streams - builds naturally on what you’ve learned here.

Last updated on