Returns a new stream that batches writes based on a specific interval, count of items, or both. All items read on the stream and will be re-emitted as part of an array of one or more items based on the criteria defined in the options.
(Object?)
Define how items will be batched and written to the
the stream. Note that when both
time
and
count
are
defined, items will be written to the stream whenever
either one of the conditions is met.
Name | Description |
---|---|
options.time Number?
(default 0 )
|
The interval, in milliseconds, to limit writes to. During that time, all items read on the stream will be collected and written as an array at the set interval. |
options.count Number?
(default 1 )
|
The number of items to buffer before writing all of them in an array. |
Stream.Transform
:
Transform stream.
const input = nodeStream.through.obj();
input.pipe(nodeStream.batch({ time: 100 }));
input.write(1);
setTimeout(function() {
input.write(2);
input.write(3);
}, 100);
setTimeout(function() {
input.write(4);
input.write(5);
}, 200);
// => [[1], [2, 3], [4, 5]]
const input = nodeStream.through.obj();
input.pipe(nodeStream.batch({ count: 2 }));
input.write(1);
input.write(2);
input.write(3);
input.write(4);
input.write(5);
// => [[1, 2], [3, 4], [5]]
Creates a new Stream without the first n
values from the source stream.
(Number)
Number of items to drop from the source stream.
Stream.Transform
:
Transform stream.
// drop the first 3 items from a stream
inStream // => ['b', 'a', 'n', 'a', 'n', 'a']
.pipe(nodeStream.drop(3))
.pipe(process.stdout)
// => ['a', 'n', 'a']
Creates a new stream with all elements that pass the test implemented by the provided function. Similar to Array.filter... but on a stream.
(Function)
Function that filters elements on the stream.
Takes one argument, the value of the item at
this position in the stream.
Stream.Transform
:
A transform stream with the filtered values.
// If you wanted to create a new stream whose values all passed a certain criteria,
// you could do something like the following. Assuming "test-scores.txt" is a file
// containing the following data:
// Sally...90
// Tommy...94
// Jimmy...12
// Sarah...82
// Jonny...64
// We can write a function that returns the students who are failing:
fs.createReadStream('test-scores.txt')
.pipe(nodeStream.split()) // split on new lines
.pipe(nodeStream.filter(value => {
const [student, testScore] = value.toString().split('...');
return Number(testScore) < 70;
}));
// The resulting stream would have the following data:
// Jimmy...12
// Jonny...64
// It is also possible to filter a stream asynchronously for more complex actions.
// Note: The signature of the function that you pass as the callback is important. It
// MUST have *two* parameters.
// Assuming "filenames.txt" is a newline-separated list of file names, you could
// create a new stream with only valid names by doing something like the following:
fs.createReadStream('filenames.txt')
.pipe(nodeStream.split()) // split on new lines
.pipe(nodeStream.filter((value, next) => {
fs.stat(value, (err, stats) => {
// Error the stream since this file is not valid
if (err) {
return next(err);
}
next(null, stats.isFile());
});
}));
// The resulting stream will contain the filenames that passed the test. Note: If `next`
// is called with an error as the first argument, the stream will error. This is typical
// behavior for node callbacks.
Creates a new Stream with the first element from the source stream
where condition
is true. A convenient form of filter
.
(Function)
Function that filters elements on the stream.
Takes one argument, the value of the item at
this position in the stream.
Stream.Transform
:
A transform stream with the first value that
passed the test.
// If you wanted to create a new stream with the first value that passed a certain
// criteria, you could do something like the following. Assuming "test-scores.txt"
// is a file containing the following data:
// Sally...90
// Tommy...94
// Jimmy...12
// Sarah...82
// Jonny...64
// We can write a function that returns the first failing student:
fs.createReadStream('test-scores.txt')
.pipe(nodeStream.split()) // split on new lines
.pipe(nodeStream.find(value => {
const [student, testScore] = value.toString().split('...');
return Number(testScore) < 70;
}));
// The resulting stream would have the following data:
// Jimmy...12
// It is also possible to filter a stream asynchronously for more complex actions.
// Note: The signature of the function that you pass as the callback is important. It
// MUST have *two* parameters.
// Assuming "filenames.txt" is a newline-separated list of file names, you could
// create a new stream with the first valid filename by doing something like the following:
fs.createReadStream('filenames.txt')
.pipe(nodeStream.split()) // split on new lines
.pipe(nodeStream.find((value, next) => {
fs.stat(value, (err, stats) => {
// Error the stream since this file is not valid
if (err) {
return next(err);
}
next(null, stats.isFile());
});
}));
// The resulting stream will contain the first filename that passed the test. Note: If `next`
// is called with an error as the first argument, the stream will error. This is typical
// behavior for node callbacks.
Creates a new Stream with the first element from the source stream
that matches the query
. A convenient form of where
. It performs
a deep comparison between a given query
and items in the source
stream. Items that match the query
are forwarded to the output
stream.
(Object)
An object of properties to compare against all items in
the source stream.
Stream.Transform
:
Transform stream.
// find the first element that matches the condition
objStream // => [{ name: 'Bob', age: 30 }, { name: 'Lisa', age: 30 }]
.pipe(nodeStream.findWhere({ age: 30 })
// => [{ name: 'Bob', age: 30 }]
Returns a new stream that flattens all arrays passing through by one level. All non-array items will be passed through as-is.
Stream.Transform
:
Transform stream.
const input = nodeStream.through.obj();
input.pipe(nodeStream.flatten());
input.write([1, 2, 3]);
input.write([4, 5]);
input.write(6);
// => [1, 2, 3, 4, 5, 6]
const input = nodeStream.through.obj();
input
// batch items that are read
.pipe(nodeStream.batch({ count: 2 }))
// perform a transform action on the batches
.pipe(transformBatch())
// flatten the batches back
.pipe(nodeStream.flatten());
input.write(1);
input.write(2);
input.write(3);
input.write(4);
input.write(5);
// => [1, 2, 3, 4, 5]
Creates a new readable stream from an array. This is primarily useful for piping into additional node-stream methods like map, reduce and filter.
(Array)
An array which will be converted to a stream. If an item
in the array is
null
the stream will end early. Every
other data type is allowed and valid.
Stream.Readable
:
Readable stream.
source
is not an array.
// Create a stream from an array which is then piped to another node-stream method
nodeStream.fromArray(['file1.txt', 'file2.txt', 'file3.txt'])
.pipe(nodeStream.map(fs.readFile));
// => ['contents of file1.txt', 'contents of file2.txt', 'contents of file3.txt']
Creates a new readable stream from a function which accepts a node-style callback. This is primarily useful for piping into additional node-stream methods like map, reduce and filter.
(Function)
A function to call which accepts a node-style
callback as the last argument. When that callback
is called, this stream will emit all arguments
as a single array.
Stream.Readable
:
Readable stream.
source
is not a function.
// Create a stream from a function callback which is then piped to another node-stream method
nodeStream.fromCallback(fs.readdir.bind(this, path.resolve('.')))
.pipe(nodeStream.map(fs.readFile));
// => ['contents of file1.txt', 'contents of file2.js', 'contents of file3.pdf']
Creates a new readable stream from a promise. This is primarily useful for piping into additional node-stream methods like map, reduce and filter.
(Promise)
A promise which will be converted to a stream. If the promise
resolves, each argument becomes a discrete item in the stream.
If the promise rejects, the stream will emit an "error" event.
Stream.Readable
:
Readable stream.
source
is not a promise.
// Create a stream from a promise
nodeStream.fromPromise(globby('*.js'))
.pipe(nodeStream.map(fs.readFile));
// => ['contents of file1.js', 'contents of file2.js', 'contents of file3.js']
Returns a stream that has been split on new lines (by default). This is a wrapper of split2 by mcollina.
Stream
:
Transform stream.
Returns a transform stream with a simple API. This is a wrapper of through2 by rvagg.
(Function?)
A function that takes a stream chunk, encoding and callback
to transform the data in a stream. Additional items can
be appended to the stream by calling
this.push(chunk)
.
(Function?)
A function called at the end of the stream that can be
used to finish up any processing. Additional items can be
appended to the stream by calling
this.push(chunk)
.
Stream
:
Transform stream.
Returns a transform stream (in object mode) with a simple API. This is a wrapper of through2 by rvagg.
(Function?)
A function that takes a stream chunk, encoding and callback
to transform the data in a stream. Additional items can
be appended to the stream by calling
this.push(chunk)
.
(Function?)
A function called at the end of the stream that can be
used to finish up any processing. Additional items can be
appended to the stream by calling
this.push(chunk)
.
Stream
:
Transform stream.
Creates a new stream with the value
emitted between every item in the source
stream.
(String)
Value that should be emitted between every existing item.
Stream.Transform
:
Transform stream.
// Log some values to the console with new lines interspersed
shoppingList // => ['banana', 'apple', 'orange']
.pipe(nodeStream.intersperse('\n'))
.pipe(process.stdout)
// => ['banana', '\n', 'apple', '\n', 'orange']
Creates a new stream with the results of calling the provided function on every item in the stream. Similar to Array.map... but on a stream.
(Function)
Function that returns a new element on the
stream. Takes one argument, the value of the
item at this position in the stream.
Stream.Transform
:
A transform stream with the modified values.
// For a simple find/replace, you could do something like the following. Assuming
// "example.txt" is a file with the text "the text has periods. because, english.",
// you could replace each period with a comma like so:
fs.createReadStream('example.txt')
.pipe(nodeStream.map(value => value.toString().replace('.', ',')));
// The resulting stream will have the value "the text has periods, because, english,".
// It is also possible to transform a stream asynchronously for more complex actions.
// Note: The signature of the function that you pass as the callback is important. It
// MUST have *two* parameters.
// Assuming "filenames.txt" is a newline-separated list of file names, you could
// create a new stream with their contents by doing something like the following:
fs.createReadStream('filenames.txt')
.pipe(nodeStream.split()) // split on new lines
.pipe(nodeStream.map((value, next) => {
fs.readFile(value, next);
}));
// The resulting stream will contain the text of each file. Note: If `next` is called
// with an error as the first argument, the stream will error. This is typical behavior
// for node callbacks.
Creates a new stream where every element in the source stream is parsed as JSON.
Stream.Transform
:
Transform stream.
// parse a newline-separated JSON file
fs.createReadStream('example.log')
.pipe(nodeStream.split())
.pipe(nodeStream.parse());
// parse a large JSON file
fs.createReadStream('warandpeace.json')
.pipe(nodeStream.wait())
.pipe(nodeStream.parse());
Creates a new stream with the output composed of the picked properties from the source stream.
(...String)
A collection of properties that will
be picked from the source object.
Stream.Transform
:
Transform stream.
// get some properties from each item in the stream
// (e.g.: [{ name: 'pam', age: 24 }, { name: 'joe', age: 30 }])
objStream
.pipe(nodeStream.pick('age'))
// => [{ age: 24 }, { age: 30 }]
Returns a new stream that writes to the first given stream and reads from the last given stream. All errors are routed to the new output stream. This is a wrapper of stream-combiner2 by substack.
Stream
:
Transform stream.
// emit the largest line in a file
function getLargestLine() {
return nodeStream.pipeline(
nodeStream.split(),
nodeStream.sort((a, b) => {
return a.length < b.length;
}),
nodeStream.take(1)
);
}
// find the longest line of a haiku
process.stdin // => ['refreshing and cool\nlove is ', 'a sweet summer ', 'rain\nthat washes the world']
.pipe(getLargestLine())
// => ['love is a sweet summer rain']
Object mode of pipeline
. Returns a new stream that writes to the first given stream and
reads from the last given stream. All errors are routed to the new output stream. This is a
wrapper of stream-combiner2
by substack.
Stream.Transform
:
Transform stream.
// read the contents of a file and parse json
function readJson() {
return nodeStream.pipeline.obj(
nodeStream.wait(),
nodeStream.parse()
);
}
// parse stdin as JSON in a single step
process.stdin // => ['{"', 'banana":', '"appl', 'e"}']
.pipe(readJson())
// => { 'banana': 'apple' }
Creates a new stream with the output composed of the plucked property from each item in the source stream.
Stream.Transform
:
Transform stream.
// get the value of the "age" property for every item in the stream
objStream // => [{ name: 'pam', age: 24 }, { name: 'joe', age: 30 }])
.pipe(nodeStream.pluck('age'))
// => [24, 30]
Creates a new stream with a single item that's produced by calling a reducer with each item of the original stream. Similar to Array.reduce... but on a stream.
(Function)
Function that reduces items in the stream. Takes
two arguments: the current value of the reduction,
and the value of the item at this position in the
stream.
(any?)
Value to use as the first argument to the first
call of the
reducer
.
Stream.Transform
:
A transform stream that results from the reduction.
// If you wanted to determine the content-length of a stream, you could do something like
// the following. Assuming "example.txt" is a large file, you could determine it's length
// by doing the following:
fs.createReadStream('example.txt')
.pipe(nodeStream.reduce((length, value) => length + value.length), 0);
// The resulting stream will have an integer value representing the length of "example.txt".
// It is also possible to reduce a stream asynchronously for more complex actions.
// Note: The signature of the function that you pass as the callback is important. It
// MUST have *three* parameters.
// Assuming "twitterers.txt" is a newline-separated list of your favorite tweeters, you
// could identify which is the most recently active by using the Twitter API:
fs.createReadStream('twitterers.txt')
.pipe(nodeStream.split()) // split on new lines
.pipe(nodeStream.reduce((memo, user, next) => {
twit.get('search/tweets', { q: `from:${user}`, count: 1 }, (err, data) => {
// Error the stream since this request failed
if (err) {
return next(err);
}
// This is the first iteration of the reduction, so we automatically save the tweet
if (!memo) {
return next(null, data);
}
// This tweet is the most recent so far, save it for later
if (new Date(data.statuses.created_at) > new Date(memo.statuses.created_at)) {
return next(null, data);
}
// The tweet we have saved is still the most recent
next(null, memo);
});
}));
// The resulting stream will contain the most recent tweet of the users in the list.
// Note: If `next` is called with an error as the first argument, the stream will error.
// This is typical behavior for node callbacks.
Creates a new stream where all elements of the source stream have been sorted by the Array.sort method. Each value will be emitted individually.
Note: This method will buffer all contents of the source stream before sending it. You should not use this method if your source stream is large as it could consume large amounts of memory.
(Function)
A function that will be passed directly
to Array.sort for item comparison.
Stream.Transform
:
Transform stream.
// sort a stream of numbers
objStream // => [10, 3, 9, 2, 4, 1]
.pipe(nodeStream.sort())
// => [1, 2, 3, 4, 9, 10]
Creates a new stream where every element in the source stream
is converted to a string by using JSON.stringify
.
Stream.Transform
:
Transform stream.
// stringify every element in an object stream so it can be
// piped to a non-object stream.
objStream
.pipe(nodeStream.stringify())
.pipe(process.stdout);
Creates a new Stream with the first n
values from the source stream.
(Number)
Number of items to take from the source stream.
Stream.Transform
:
Transform stream.
// take the first 3 items from a stream
inStream // => ['b', 'a', 'n', 'a', 'n', 'a']
.pipe(nodeStream.take(3))
.pipe(process.stdout)
// => ['b', 'a', 'n']
Creates a new stream with a single value that's a Buffer of the entire contents of the stream.
(Function)
A function to be called with the contents
of the stream. This is a convenience to
avoid listening to the data/end events of
the stream.
Stream.Transform
:
Transform stream.
// get the entire contents of a file
fs.createReadStream('example.txt')
.pipe(nodeStream.wait());
// => Buffer
Creates a new stream with a single value that's an object created by JSON parsing the contents of the entire stream.
(Function)
A function to be called with the contents
of the stream. This is a convenience to
avoid listening to the data/end events of
the stream.
Stream.Transform
:
Transform stream.
// parse the JSON contents of a file
fs.createReadStream('example.json')
.pipe(nodeStream.wait.json());
// => { 'nanananananananananana': 'batman' }
Creates a new stream with a single value that's an array of every item in the stream.
(Function)
A function to be called with the contents
of the stream. This is a convenience to
avoid listening to the data/end events of
the stream.
Stream.Transform
:
Transform stream.
// get all of the items in an object stream
objStream
.pipe(nodeStream.wait.obj());
// => [{ 'name': 'paul' }, { 'name': 'lisa' }, { 'name': 'mary' }]
A convenient form of filter which performs a deep comparison between a given query
and items in the source stream. Items that match the query
are forwarded to the output
stream.
(Object)
An object of properties to compare against all items in
the source stream.
Stream.Transform
:
Transform stream.
// Get all users from a given zip code
users // => [{ name: 'Bill', zip: 90210 }, { name: 'Tracy', zip: 33193 }, { name: 'Paul', zip: 90210 }]
.pipe(nodeStream.where({ zip: 90210 }))
// => [{ name: 'Bill', zip: 90210 }, { name: 'Paul', zip: 90210 }]