Calvin French-Owen

Co-founder @ Segment.io

An Introduction to Node's New Streams

Recently, there’s been a lot of commotion on twitter and in #node.js about the new streams2 API. The official stream docs leave a lot to be desired, which has lead to general confusion. That’s too bad, because using new streams can really simplify your code once you understand how they work. Let me take you there…

If you’ve never used node streams before, I highly recommend you read Max Ogden’s introduction. For the lazy: a stream pipes data from a “source” to a “sink” without reading the entire dataset into memory all at once. (Think unix pipe, but for node.)

Why do we need a new version of streams?

In previous versions of node, the only help you were given was a basic Stream prototype. It didn’t give you much more than a .pipe function which connected your input and output. Streams were hard to implement. Well, that’s not entirely true. They were hard to implement correctly.

There used to be all sorts of implicit rules which proper streams should follow in order to write data. If a stream couldn’t hold anymore data, writes should return false. But hardly any streams respected “backpressure” properly in complex multi-pipe operations.

Worse still, streams would start emitting data willy-nilly (generally on nextTick). And if you wanted to declare event handlers asynchronously, you’d have to pray that the stream’s author implemented the .pause method!

Stream authors had to keep track of way too much state and define way too many methods.

In node 0.10.x, you get a lot more right up front.

Instead of a single Stream prototype, you get four new base prototypes to extend from. Each of the prototypes gives you a lot of functionality out of the box: they’ll handle backpressure, proper event handling, and they’ll even optimize buffering data. You only need to override a method or two and you’ll have your own custom stream!

I’ll show you a real-world example.

At Segment.io we log all of our data to Amazon’s S3 service, where we store them as GZIPed, line-separated JSON. There are millions of these log files (certainly more than I have hard drive space for) so when we want to manipulate them, it makes sense to stream them directly from Amazon.

I wanted to build a stream that would let us export this data and send it anywhere we need. Maybe it’s a local file, a custom script, or even an external API.

The coolest part? All this can be implemented correctly (and still interface with core functions) with only a hundred or so lines of code!

Let’s get started.

The new Stream prototypes.

Like I mentioned earlier, there are four new Stream prototypes in 0.10 baked right into the stream module. I’m going to cover the three that are relevant to our example.

If you want 0.8 compatibility, you’ll need to use the readable-stream module instead. It’s just the core module ripped out into a version that you can install through npm. Then whenever you’d like to drop 0.8 support you just swap your require statements!

stream.Readable

Any data that you read will always start with a stream.Readable object. Could be receiving data from an HTTP response, reading from a file, or querying a database. Readable streams are what you use to read data from an external source.

Once you’ve attached a 'data' event handler to your readable stream, it will emit new data every time there is something new to read. You can pipe the readable stream to a writable stream, or decide to read it on your own:

var fs = require('fs');

var readable = fs.createReadStream('my-file.txt');

// this is the classic api
readable
  .on('data',  function (data) { console.log('Data!', data);  })
  .on('error', function (err)  { console.error('Error', err); })
  .on('end',   function ()     { console.log('All done!');    });

I’m not going to talk about the new readable API, since I find it more confusing and less useful for stream implementers. The .on('data') interface will work for any kind of streams, so that’s what I’ll use here. If you’re piping, this wont matter at all.

stream.Writable

Writable streams are just the opposite. They are the ‘sink’ of a stream; where all your data will be written. You can also pipe data into a writable stream straight away.

var fs = require('fs');

var readable = fs.createReadStream('old-file.txt')
  , writable = fs.createWriteStream('copy.txt');

readable.pipe(writable)
  .on('finish', function () { writable.write('an extra line'); });

A piped writable stream will also output a 'finish' event when it has completed, allowing you to gracefully exit the program after writing all the output.

stream.Transform

Transform streams are both readable and writable, where the input is related to the output. Any sort of map or filter step is nicely modeled as a transform stream.

You’ll usually implement a transform stream to bridge the gap between readable and writable streams, generally as the pipe between the two. If you’ve used dominictarr’s through module, transform streams provide very similar functionality.


By building on top of those three prototypes, we can write some pretty complex functionality in very few lines of code.

Let’s look at all of the tiny streams that go into our S3 example…

1. List the files from S3.

Remember, your stream source will always involve a readable stream. In our case, the first step is listing all the files in our S3 bucket. Since we could have thousands or millions of files in our bucket, we’ll want to stream the results.

Implementing a readable stream has two parts. First we want to inherit from the readable stream prototype and put our stream into objectMode.

What’s this objectMode? It’s a flag used to tell a stream that it should be dealing with objects instead of strings or buffers. Streams which deal only with text (which happens a lot in core) can do optimizations and more precise backpressure related to buffering.

As a general rule, whenever you want to read or write javascript objects which are not strings or buffers, you’ll need to put your stream into objectMode. This is really important as any stream not in objectMode will refuse to write, pipe, or read an object.

As an aside, objectMode feels very half-baked in new streams. I can understand the reasoning, but my gut is that streams should just operate on ‘data’. Forcing a distinction between objects and text feels unnatural.

Our constructor also sets up a few variables for keeping state on our stream. Here, the s3 variable is a knox client used to access s3 buckets.

function S3Lister (s3, options) {
  options || (options = {});
  stream.Readable.call(this, { objectMode : true });

  this.s3 = s3; // a knox-like client.
  this.marker = options.start;
  this.connecting = false;
  this.ended  = false;
}

util.inherits(S3Lister, stream.Readable);

We set the stream to use objectMode as we want to return not just filenames, but some metadata about each one as well.

Next, we have to override the ._read function. Whenever the program is ready to read data from your readable stream, the internal implementation will call ._read. That’s where your code takes over.

The knox s3-client has a list() function, but it’s a basic call to the Amazon API. They limits us to retrieving metadata for 1,000 files at a time. In order to list the entire contents of an s3 bucket, you have to repeatedly call s3.list() with a different starting marker every time.

Each time the client receives a response, it returns a flag whether there’s still more data to be read. We can then update the start marker with the key for the last file we received.

Here’s our ._read function along with a helper to actually list the files.

S3Lister.prototype._read = function () {
  // Limit _read to call only once per response
  if (this.connecting || this.ended) return;

  var options = {
    prefix     : this.options.prefix,
    marker     : this.marker,
    delimiter  : this.options.delimiter,
    'max-keys' : this.options.maxKeys
  };

  this._list(options);
};

S3Lister.prototype._list = function (options) {
  var self = this;
  this.connecting = true; // ensure that we have only a single connection

  this.s3.list(options, function (err, data) {
    self.connecting = false;
    if (err) return self.emit('error', err);

    var files = data.Contents;

    // if there's still more data, set the start as the last file
    if (data.IsTruncated) self.marker = files[files.length - 1].Key;
    else self.ended = true;

    files.forEach(function (file) { self.push(file); });
    if (self.ended) self.push(null);
  });
};

In order to send data to the reader, you’ll need to call .push with every chunk of data. In our case, it’s the file metadata we received.

When the reader has completely finished, you’ll want to .push(null), to signal the end of the stream.

One important gotcha is that ._read will be called immediately after .push has been called once. If you’re writing multiple chunks per single ._read, you’ll want to take care that you have some sort of flag which limits ._read from proceeding. I used the connecting flag to ensure that duplicate requests aren’t made to s3, and I make sure to set the marker before pushing any chunks.

2. Read the files from S3.

Okay, so now we have a readable stream which will emit metadata for each of our log files. Now we actually want to stream the contents of those files.

For this step we’ll use a transform stream. Remember that transform streams filter or map the input data. You’ll want to use it whenever the input data is related to the output.

In our case, each chunk of the file maps to its contents, so a transform stream will do nicely.

Once again, we set up our stream to inherit from the base transform and set objectMode on the writable part of the stream.

function S3Cat(s3, options) {
  stream.Transform.call(this);
  this._writableState.objectMode = true;
  this.s3 = s3;
  this.options = options;
}

util.inherits(S3Cat, stream.Transform);

We have to do a hacky thing to set only the input to objectMode since the stream will receive javascript objects but output buffers. I’m really hoping this gets fixed in the next version of streams to use a public API. There’s existing issues for it, but seems to have been little movement on the subject.

Next, we implement the ._transform function. Transform streams will call ._transform once per chunk of the input stream. Since our earlier stream outputs chunks of single-file metadata, each of our input chunks will be a file object.

For each chunk, we’ll use knox to retrieve the contents of the file, and then push those into our output buffer. Once we’re done, we call the callback function.

S3Cat.prototype._transform = function (file, encoding, callback) {
  var self   = this
    , path   = file.Key;

  this.s3.getFile(path, function (err, res) {
    if (err) return callback(err);

    if (self.options.gzip) res = res.pipe(zlib.createGunzip());

    res.on('data',  function (data) { self.push(data); })
       .on('error', function (err)  { callback(err);   })
       .on('end',   function ()     { callback();      });
  });
};

Since our files are gzipped, we can first pipe our response through a Gunzip stream to make sure that we are sending uncompressd text to our output.

You might notice that this stream is technically not doing everything it could to respect the backpressure of the output stream! A complete example would pause the response stream whenever .push returned false.

Thankfully, we do get some benefit from built-in backpressure. Our stream won’t call ._transform until it detects that the output buffer has returned below its highWaterMark.

If you want to specifically tune the memory of any of your buffered streams, you can specifically pass a highWaterMark option to the constructor. For streams in objectMode, this will be the number of objects instead of the buffer size.

3. Split the files into individual lines.

At this point we’re able to stream the entire contents of an s3 bucket with only a few lines of code. However, we still want to convert those lines of JSON into actual javascript objects.

To do this, we can use another set of transform streams. The first should take in chunks of text and output individual lines. A toy implementation looks something like this:

LineSplitter.prototype._transform = function (chunk, encoding, callback) {
  chunk = this.current + chunk.toString();
  var split = chunk.split('\n');
  this.current = split.pop();

  var self = this;
  split.forEach(function(line) { self.push(line); });
  callback();
};

LineSplitter.prototype._flush = function (callback) {
  this.push(this.current);
  callback();
}

What’s going on here? Each input chunk isn’t necessarily guaranteed to be at a line boundary. We have to do a little bookkeeping to keep track of the current chunk and then add new chunks to it.

Once we’ve added our current string to the chunk, we split the complete buffer by our delimiter. We’ll pop off the last chunk of the split and set it as our current half-complete line. We can then push all of our complete lines into the output buffer right away.

Transform streams also contain a special ._flush method which is called after the input has finished. In our case, we’ll want to push the final line of our file into the output once we are sure that there are no more delimiters to be found.

4. Parse the JSON strings.

We’re almost there, we’ve got files streaming from s3 and split into nice, manageable lines of JSON.

Using another transform stream, the JSON parser is trivial to write. This time, you’ll want to be sure set the readableState into objectMode so that the strings can be properly converted to objects.

JSONParser.prototype._transform = function (chunk, encoding, callback) {
  try {
    var parsed = JSON.parse(chunk.toString());
    this.push(parsed);
    callback();
  } catch (err) {
    callback(err);
  }
}

5. Write the output.

Finally, we’ll want to use a writable stream to actually make use of our parsed log data! We’ll inherit from the Writable stream, and override the ._write method. Once again, we’ll enable objectMode since we’re receiving the results of parsed JSON.

function Uploader () {
  stream.Writable.call(this, { objectMode : true });
}

util.inherits(Uploader, stream.Writable);

Uploader.prototype._write = function (data, encoding, callback) {
  api.upload({
    event     : data.event,
    timestamp : data.timestamp
  }, callback);
};

That’s it! We can then pipe to our uploader and start replaying our logged data to whatever source we’d like!

Now let’s put it all together!

The best part of this is that we can now pipe these streams together to provide a single ‘Readable’ interface to our program! Combined with node’s module pattern, this is a really powerful way to hide interface complexity.

Here’s what our replay module can look like:

module.exports = function (s3) {
  return new S3Lister(s3)                 // list all the files in our bucket
    .pipe(new S3Cat(s3, { gzip : true })) // stream the file contents
    .pipe(new LineSplitter()              // split by lines
    .pipe(new JSONParser());              // parse each line
}

It returns a readable stream, which we can use wherever we’d like! The possibilities are endless.

var replay = require('replay')
  , knox   = require('knox');

var s3 = knox.createClient({ ... });

// We can pipe to our custom uploader!
replay(s3).pipe(new Uploader());

// Or maybe print all of our data to the console!
replay(s3).on('data', function (data) { console.log(data); });

Since each stream implements only a single function, we can compose them however we’d like. Streaming all of our logs to our hard drive is just as easy:

var stream = new S3Lister(s3)
              .pipe(new S3Cat(s3))
              .pipe(fs.createWriteStream('./my-file.log');

stream.on('finish', function () { console.log('Logs finished writing!'); });

Look at the magic of that .pipe! Now, you’re thinking with streams!

Don’t stop there…

So now that we’ve cut our teeth on new streams, where do we go from here?

Your first stop should be Dominic Tarr’s pull-stream for a different take on streams in node.

While core streams rely on the source pushing data, pull-streams take the opposite approach. The sink drives the interaction by ‘pulling’ data from the source. That means no backpressure problems, and (by design) no differentiation between object streams and text streams.

This is particularly useful in the case where you want to add concurrent processing to your streams. For instance, perhaps I don’t care about the exact ordering of my logs, but would like to process 10 lines at a time. Instead of building an awkward transform stream to batch multiple items at a time, I can use a parallel pull stream.

For general streaming goodness, it’s also worth looking at Substack’s stream-handbook and new stream-adventure. There’s some seriously cool streaming applications to be found there.

Personally, I hope to see streams get rid of objectMode. There are also quite a few gotchas related to how streams are implemented internally which tripped me up. I could only solve them once I started reading through the stream source itself. Having better stream documentation would go a long way to help solve this problem.

I do like the fact that new streams are compact and simple to implement correctly (once you’ve got the hang of them). For that, I really applaud Isaacs’ work in trying to tackle the problem of making streams which handle edge cases easy to implement.

I don’t think streams even necessarily have to live in the node core, though I’d like to see a few common implementations which are well-written and play nicely together.

Overall, I’ve found the simplicity and composability of streams to be one of node’s nicest features. I’m very excited to see the community actually start trying out the new streams and improving them.