Returns a new stream that batches writes based on a specific interval, count of items, or both. All items read on the stream and will be re-emitted as part of an array of one or more items based on the criteria defined in the options.

batch(options: Object?): Stream.Transform
Since: 1.5.0
Parameters
options (Object?) Define how items will be batched and written to the the stream. Note that when both time and count are defined, items will be written to the stream whenever either one of the conditions is met.
Name Description
options.time Number? (default 0) The interval, in milliseconds, to limit writes to. During that time, all items read on the stream will be collected and written as an array at the set interval.
options.count Number? (default 1) The number of items to buffer before writing all of them in an array.
Returns
Stream.Transform: Transform stream.
Example
const input = nodeStream.through.obj();

input.pipe(nodeStream.batch({ time: 100 }));

input.write(1);

setTimeout(function() {
  input.write(2);
  input.write(3);
}, 100);

setTimeout(function() {
  input.write(4);
  input.write(5);
}, 200);

// => [[1], [2, 3], [4, 5]]
const input = nodeStream.through.obj();

input.pipe(nodeStream.batch({ count: 2 }));

input.write(1);
input.write(2);
input.write(3);
input.write(4);
input.write(5);

// => [[1, 2], [3, 4], [5]]

Creates a new Stream without the first n values from the source stream.

Since: 1.4.0
Parameters
n (Number) Number of items to drop from the source stream.
Returns
Stream.Transform: Transform stream.
Example
// drop the first 3 items from a stream
inStream // => ['b', 'a', 'n', 'a', 'n', 'a']
  .pipe(nodeStream.drop(3))
  .pipe(process.stdout)
  // => ['a', 'n', 'a']

Creates a new stream with all elements that pass the test implemented by the provided function. Similar to Array.filter... but on a stream.

filter(condition: Function): Stream.Transform
Since: 1.0.0
Parameters
condition (Function) Function that filters elements on the stream. Takes one argument, the value of the item at this position in the stream.
Returns
Stream.Transform: A transform stream with the filtered values.
Example
// If you wanted to create a new stream whose values all passed a certain criteria,
// you could do something like the following. Assuming "test-scores.txt" is a file
// containing the following data:
// Sally...90
// Tommy...94
// Jimmy...12
// Sarah...82
// Jonny...64

// We can write a function that returns the students who are failing:
fs.createReadStream('test-scores.txt')
  .pipe(nodeStream.split()) // split on new lines
  .pipe(nodeStream.filter(value => {
    const [student, testScore] = value.toString().split('...');

    return Number(testScore) < 70;
  }));

// The resulting stream would have the following data:
// Jimmy...12
// Jonny...64
// It is also possible to filter a stream asynchronously for more complex actions.
// Note: The signature of the function that you pass as the callback is important. It
// MUST have *two* parameters.

// Assuming "filenames.txt" is a newline-separated list of file names, you could
// create a new stream with only valid names by doing something like the following:
fs.createReadStream('filenames.txt')
  .pipe(nodeStream.split()) // split on new lines
  .pipe(nodeStream.filter((value, next) => {
    fs.stat(value, (err, stats) => {

      // Error the stream since this file is not valid
      if (err) {
        return next(err);
      }

      next(null, stats.isFile());
    });
  }));

// The resulting stream will contain the filenames that passed the test. Note: If `next`
// is called with an error as the first argument, the stream will error. This is typical
// behavior for node callbacks.

Creates a new Stream with the first element from the source stream where condition is true. A convenient form of filter.

find(condition: Function): Stream.Transform
Since: 1.4.0
Parameters
condition (Function) Function that filters elements on the stream. Takes one argument, the value of the item at this position in the stream.
Returns
Stream.Transform: A transform stream with the first value that passed the test.
Example
// If you wanted to create a new stream with the first value that passed a certain
// criteria, you could do something like the following. Assuming "test-scores.txt"
// is a file containing the following data:
// Sally...90
// Tommy...94
// Jimmy...12
// Sarah...82
// Jonny...64

// We can write a function that returns the first failing student:
fs.createReadStream('test-scores.txt')
  .pipe(nodeStream.split()) // split on new lines
  .pipe(nodeStream.find(value => {
    const [student, testScore] = value.toString().split('...');

    return Number(testScore) < 70;
  }));

// The resulting stream would have the following data:
// Jimmy...12
// It is also possible to filter a stream asynchronously for more complex actions.
// Note: The signature of the function that you pass as the callback is important. It
// MUST have *two* parameters.

// Assuming "filenames.txt" is a newline-separated list of file names, you could
// create a new stream with the first valid filename by doing something like the following:
fs.createReadStream('filenames.txt')
  .pipe(nodeStream.split()) // split on new lines
  .pipe(nodeStream.find((value, next) => {
    fs.stat(value, (err, stats) => {

      // Error the stream since this file is not valid
      if (err) {
        return next(err);
      }

      next(null, stats.isFile());
    });
  }));

// The resulting stream will contain the first filename that passed the test. Note: If `next`
// is called with an error as the first argument, the stream will error. This is typical
// behavior for node callbacks.

Creates a new Stream with the first element from the source stream that matches the query. A convenient form of where. It performs a deep comparison between a given query and items in the source stream. Items that match the query are forwarded to the output stream.

findWhere(query: Object): Stream.Transform
Since: 1.4.0
Parameters
query (Object) An object of properties to compare against all items in the source stream.
Returns
Stream.Transform: Transform stream.
Example
// find the first element that matches the condition
objStream // => [{ name: 'Bob', age: 30 }, { name: 'Lisa', age: 30 }]
  .pipe(nodeStream.findWhere({ age: 30 })
  // => [{ name: 'Bob', age: 30 }]

Returns a new stream that flattens all arrays passing through by one level. All non-array items will be passed through as-is.

flatten(): Stream.Transform
Since: 1.5.0
Returns
Stream.Transform: Transform stream.
Example
const input = nodeStream.through.obj();

input.pipe(nodeStream.flatten());

input.write([1, 2, 3]);
input.write([4, 5]);
input.write(6);

// => [1, 2, 3, 4, 5, 6]
const input = nodeStream.through.obj();

input
  // batch items that are read
  .pipe(nodeStream.batch({ count: 2 }))
  // perform a transform action on the batches
  .pipe(transformBatch())
  // flatten the batches back
  .pipe(nodeStream.flatten());

input.write(1);
input.write(2);
input.write(3);
input.write(4);
input.write(5);

// => [1, 2, 3, 4, 5]

Creates a new readable stream from an array. This is primarily useful for piping into additional node-stream methods like map, reduce and filter.

fromArray(source: Array): Stream.Readable
Since: 1.6.0
Parameters
source (Array) An array which will be converted to a stream. If an item in the array is null the stream will end early. Every other data type is allowed and valid.
Returns
Stream.Readable: Readable stream.
Throws
Example
// Create a stream from an array which is then piped to another node-stream method
nodeStream.fromArray(['file1.txt', 'file2.txt', 'file3.txt'])
  .pipe(nodeStream.map(fs.readFile));
  // => ['contents of file1.txt', 'contents of file2.txt', 'contents of file3.txt']

Creates a new readable stream from a function which accepts a node-style callback. This is primarily useful for piping into additional node-stream methods like map, reduce and filter.

fromCallback(source: Function): Stream.Readable
Since: 1.6.0
Parameters
source (Function) A function to call which accepts a node-style callback as the last argument. When that callback is called, this stream will emit all arguments as a single array.
Returns
Stream.Readable: Readable stream.
Throws
Example
// Create a stream from a function callback which is then piped to another node-stream method
nodeStream.fromCallback(fs.readdir.bind(this, path.resolve('.')))
  .pipe(nodeStream.map(fs.readFile));
  // => ['contents of file1.txt', 'contents of file2.js', 'contents of file3.pdf']

Creates a new readable stream from a promise. This is primarily useful for piping into additional node-stream methods like map, reduce and filter.

fromPromise(source: Promise): Stream.Readable
Since: 1.6.0
Parameters
source (Promise) A promise which will be converted to a stream. If the promise resolves, each argument becomes a discrete item in the stream. If the promise rejects, the stream will emit an "error" event.
Returns
Stream.Readable: Readable stream.
Throws
Example
// Create a stream from a promise
nodeStream.fromPromise(globby('*.js'))
  .pipe(nodeStream.map(fs.readFile));
  // => ['contents of file1.js', 'contents of file2.js', 'contents of file3.js']

Returns a stream that has been split on new lines (by default). This is a wrapper of split2 by mcollina.

split(matcher: (RegExp | String)?): Stream
Since: 1.0.0
Parameters
matcher ((RegExp | String)?) A regular expression or string to split the stream by. The characters that match this regular expression are removed.
Returns
Stream: Transform stream.

Returns a transform stream with a simple API. This is a wrapper of through2 by rvagg.

through(options: Object?, transform: Function?, flush: Function?): Stream
Since: 1.0.0
Parameters
options (Object?) Optional and passed directly to stream.Transform .
transform (Function?) A function that takes a stream chunk, encoding and callback to transform the data in a stream. Additional items can be appended to the stream by calling this.push(chunk) .
flush (Function?) A function called at the end of the stream that can be used to finish up any processing. Additional items can be appended to the stream by calling this.push(chunk) .
Returns
Stream: Transform stream.

through.obj

lib/index.js

Returns a transform stream (in object mode) with a simple API. This is a wrapper of through2 by rvagg.

through.obj(options: Object?, transform: Function?, flush: Function?): Stream
Since: 1.0.0
Parameters
options (Object?) Optional and passed directly to stream.Transform .
transform (Function?) A function that takes a stream chunk, encoding and callback to transform the data in a stream. Additional items can be appended to the stream by calling this.push(chunk) .
flush (Function?) A function called at the end of the stream that can be used to finish up any processing. Additional items can be appended to the stream by calling this.push(chunk) .
Returns
Stream: Transform stream.

Creates a new stream with the value emitted between every item in the source stream.

intersperse(value: String): Stream.Transform
Since: 1.3.0
Parameters
value (String) Value that should be emitted between every existing item.
Returns
Stream.Transform: Transform stream.
Example
// Log some values to the console with new lines interspersed
shoppingList // => ['banana', 'apple', 'orange']
  .pipe(nodeStream.intersperse('\n'))
  .pipe(process.stdout)
  // => ['banana', '\n', 'apple', '\n', 'orange']

Creates a new stream with the results of calling the provided function on every item in the stream. Similar to Array.map... but on a stream.

map(transform: Function): Stream.Transform
Since: 1.0.0
Parameters
transform (Function) Function that returns a new element on the stream. Takes one argument, the value of the item at this position in the stream.
Returns
Stream.Transform: A transform stream with the modified values.
Example
// For a simple find/replace, you could do something like the following. Assuming
// "example.txt" is a file with the text "the text has periods. because, english.",
// you could replace each period with a comma like so:
fs.createReadStream('example.txt')
  .pipe(nodeStream.map(value => value.toString().replace('.', ',')));

// The resulting stream will have the value "the text has periods, because, english,".
// It is also possible to transform a stream asynchronously for more complex actions.
// Note: The signature of the function that you pass as the callback is important. It
// MUST have *two* parameters.

// Assuming "filenames.txt" is a newline-separated list of file names, you could
// create a new stream with their contents by doing something like the following:
fs.createReadStream('filenames.txt')
  .pipe(nodeStream.split()) // split on new lines
  .pipe(nodeStream.map((value, next) => {
    fs.readFile(value, next);
  }));

// The resulting stream will contain the text of each file. Note: If `next` is called
// with an error as the first argument, the stream will error. This is typical behavior
// for node callbacks.

Creates a new stream where every element in the source stream is parsed as JSON.

parse(options: Object?): Stream.Transform
Since: 1.1.0
Parameters
options (Object?) Options to use when parsing items in the stream.
Name Description
options.error Boolean? (default true) If true, an error caught when parsing JSON will be emitted on the stream. If false, the unparseable item will be removed from the stream without error.
Returns
Stream.Transform: Transform stream.
Example
// parse a newline-separated JSON file
fs.createReadStream('example.log')
  .pipe(nodeStream.split())
  .pipe(nodeStream.parse());
// parse a large JSON file
fs.createReadStream('warandpeace.json')
  .pipe(nodeStream.wait())
  .pipe(nodeStream.parse());

Creates a new stream with the output composed of the picked properties from the source stream.

pick(properties: ...String): Stream.Transform
Since: 1.3.0
Parameters
properties (...String) A collection of properties that will be picked from the source object.
Returns
Stream.Transform: Transform stream.
Example
// get some properties from each item in the stream
// (e.g.: [{ name: 'pam', age: 24 }, { name: 'joe', age: 30 }])
objStream
  .pipe(nodeStream.pick('age'))
  // => [{ age: 24 }, { age: 30 }]

Returns a new stream that writes to the first given stream and reads from the last given stream. All errors are routed to the new output stream. This is a wrapper of stream-combiner2 by substack.

pipeline(streams: ...(Stream | Array<Stream>)): Stream
Since: 1.0.0
Parameters
streams (...(Stream | Array<Stream>)) A series of streams that will be combined into a single output stream.
Returns
Stream: Transform stream.
Example
// emit the largest line in a file
function getLargestLine() {
  return nodeStream.pipeline(
    nodeStream.split(),
    nodeStream.sort((a, b) => {
      return a.length < b.length;
    }),
    nodeStream.take(1)
  );
}

// find the longest line of a haiku
process.stdin // => ['refreshing and cool\nlove is ', 'a sweet summer ', 'rain\nthat washes the world']
  .pipe(getLargestLine())
  // => ['love is a sweet summer rain']

Object mode of pipeline. Returns a new stream that writes to the first given stream and reads from the last given stream. All errors are routed to the new output stream. This is a wrapper of stream-combiner2 by substack.

pipeline.obj(streams: ...(Stream | Array<Stream>)): Stream.Transform
Since: 1.0.0
Parameters
streams (...(Stream | Array<Stream>)) A series of streams that will be combined into a single output stream.
Returns
Stream.Transform: Transform stream.
Example
// read the contents of a file and parse json
function readJson() {
  return nodeStream.pipeline.obj(
    nodeStream.wait(),
    nodeStream.parse()
  );
}

// parse stdin as JSON in a single step
process.stdin // => ['{"', 'banana":', '"appl', 'e"}']
  .pipe(readJson())
  // => { 'banana': 'apple' }

Creates a new stream with the output composed of the plucked property from each item in the source stream.

pluck(property: (String | Number)): Stream.Transform
Since: 1.3.0
Parameters
property ((String | Number)) A property name that will be plucked from each item in the source stream.
Returns
Stream.Transform: Transform stream.
Example
// get the value of the "age" property for every item in the stream
objStream // => [{ name: 'pam', age: 24 }, { name: 'joe', age: 30 }])
  .pipe(nodeStream.pluck('age'))
  // => [24, 30]

Creates a new stream with a single item that's produced by calling a reducer with each item of the original stream. Similar to Array.reduce... but on a stream.

reduce(reducer: Function, initialValue: any?): Stream.Transform
Since: 1.0.0
Parameters
reducer (Function) Function that reduces items in the stream. Takes two arguments: the current value of the reduction, and the value of the item at this position in the stream.
initialValue (any?) Value to use as the first argument to the first call of the reducer .
Returns
Stream.Transform: A transform stream that results from the reduction.
Example
// If you wanted to determine the content-length of a stream, you could do something like
// the following. Assuming "example.txt" is a large file, you could determine it's length
// by doing the following:
fs.createReadStream('example.txt')
  .pipe(nodeStream.reduce((length, value) => length + value.length), 0);

// The resulting stream will have an integer value representing the length of "example.txt".
// It is also possible to reduce a stream asynchronously for more complex actions.
// Note: The signature of the function that you pass as the callback is important. It
// MUST have *three* parameters.

// Assuming "twitterers.txt" is a newline-separated list of your favorite tweeters, you
// could identify which is the most recently active by using the Twitter API:
fs.createReadStream('twitterers.txt')
  .pipe(nodeStream.split()) // split on new lines
  .pipe(nodeStream.reduce((memo, user, next) => {
    twit.get('search/tweets', { q: `from:${user}`, count: 1 }, (err, data) => {

      // Error the stream since this request failed
      if (err) {
        return next(err);
      }

      // This is the first iteration of the reduction, so we automatically save the tweet
      if (!memo) {
        return next(null, data);
      }

      // This tweet is the most recent so far, save it for later
      if (new Date(data.statuses.created_at) > new Date(memo.statuses.created_at)) {
        return next(null, data);
      }

      // The tweet we have saved is still the most recent
      next(null, memo);
    });
  }));

// The resulting stream will contain the most recent tweet of the users in the list.
// Note: If `next` is called with an error as the first argument, the stream will error.
// This is typical behavior for node callbacks.

Creates a new stream where all elements of the source stream have been sorted by the Array.sort method. Each value will be emitted individually.

Note: This method will buffer all contents of the source stream before sending it. You should not use this method if your source stream is large as it could consume large amounts of memory.

sort(compareFunction: Function): Stream.Transform
Since: 1.3.0
Parameters
compareFunction (Function) A function that will be passed directly to Array.sort for item comparison.
Returns
Stream.Transform: Transform stream.
Example
// sort a stream of numbers
objStream // => [10, 3, 9, 2, 4, 1]
  .pipe(nodeStream.sort())
  // => [1, 2, 3, 4, 9, 10]

Creates a new stream where every element in the source stream is converted to a string by using JSON.stringify.

stringify(): Stream.Transform
Since: 1.1.0
Returns
Stream.Transform: Transform stream.
Example
// stringify every element in an object stream so it can be
// piped to a non-object stream.
objStream
  .pipe(nodeStream.stringify())
  .pipe(process.stdout);

Creates a new Stream with the first n values from the source stream.

Since: 1.4.0
Parameters
n (Number) Number of items to take from the source stream.
Returns
Stream.Transform: Transform stream.
Example
// take the first 3 items from a stream
inStream // => ['b', 'a', 'n', 'a', 'n', 'a']
  .pipe(nodeStream.take(3))
  .pipe(process.stdout)
  // => ['b', 'a', 'n']

Creates a new stream with a single value that's a Buffer of the entire contents of the stream.

wait(callback: Function): Stream.Transform
Since: 1.0.0
Parameters
callback (Function) A function to be called with the contents of the stream. This is a convenience to avoid listening to the data/end events of the stream.
Returns
Stream.Transform: Transform stream.
Example
// get the entire contents of a file
fs.createReadStream('example.txt')
  .pipe(nodeStream.wait());
  // => Buffer

Creates a new stream with a single value that's an object created by JSON parsing the contents of the entire stream.

wait.json(callback: Function): Stream.Transform
Since: 1.0.0
Parameters
callback (Function) A function to be called with the contents of the stream. This is a convenience to avoid listening to the data/end events of the stream.
Returns
Stream.Transform: Transform stream.
Example
// parse the JSON contents of a file
fs.createReadStream('example.json')
  .pipe(nodeStream.wait.json());
  // => { 'nanananananananananana': 'batman' }

Creates a new stream with a single value that's an array of every item in the stream.

wait.obj(callback: Function): Stream.Transform
Since: 1.0.0
Parameters
callback (Function) A function to be called with the contents of the stream. This is a convenience to avoid listening to the data/end events of the stream.
Returns
Stream.Transform: Transform stream.
Example
// get all of the items in an object stream
objStream
  .pipe(nodeStream.wait.obj());
  // => [{ 'name': 'paul' }, { 'name': 'lisa' }, { 'name': 'mary' }]

A convenient form of filter which performs a deep comparison between a given query and items in the source stream. Items that match the query are forwarded to the output stream.

where(query: Object): Stream.Transform
Since: 1.3.0
Parameters
query (Object) An object of properties to compare against all items in the source stream.
Returns
Stream.Transform: Transform stream.
Example
// Get all users from a given zip code
users // => [{ name: 'Bill', zip: 90210 }, { name: 'Tracy', zip: 33193 }, { name: 'Paul', zip: 90210 }]
  .pipe(nodeStream.where({ zip: 90210 }))
  // => [{ name: 'Bill', zip: 90210 }, { name: 'Paul', zip: 90210 }]