Protected
stackProtected
streamProtected
traceStatic
StreamMarker for the end of a stream.
Batch items together on the stream and emit them as an array. The batch can be over some size, or can be over a period of time.
Optional
byOptional
n?: numberBatch up to n
items (activates batching by count)
Optional
timeout?: numberBatch for timeout
milliseconds (activates batching by timeout)
Optional
yieldIf timeout
is reached and no items have been emitted on the
stream, still emit an empty array.
Optional
yieldProtected
getRun the provided callback after the first atom passes through the stream.
Callback to run.
Optional
options: { Optional
atom?: booleanRun on any atom (true by default). Will only run after first
ok
atom if false.
Run the provided callback after the last atom passes through the stream.
Callback to run.
Optional
options: { Optional
atom?: booleanRun on any atom (true by default). Will only run after last
ok
atom if false.
Protected
traceStatic
writableStatic
errorStatic
exceptionStatic
isStatic
isVerify if the provided atom is of the exception
variant.
Static
isStatic
isuse isException
instead
Static
okStatic
unknownSerialise the stream, and produce a Node stream with the serialised result.
Optional
options: { Optional
atoms?: booleanBy default, only ok
values are serialised, however enabling this
will serialise all values.
Optional
single?: booleanWhether to emit an array for multiple items, or only a single item.
Stream#toReadable if serialisation is not required
Iterate through each atom in the stream, and return them as a single array.
Optional
options: { Optional
atoms?: falseReturn every atom on the stream.
Optional
reject?: booleanIf an error or exception is encountered, reject the promise with it.
Optional
options: { Produce a readable node stream with the values from the stream.
What kind of readable stream to produce. When "raw" only strings and buffers can be emitted on the stream. Use "object" to preserve objects in the readable stream. Note that object values are not serialised, they are emitted as objects.
Optional
options: { Options for configuring how atoms are output on the stream
Optional
atoms?: booleanStream#serialize if the stream values should be serialized to json
Produce a readable node stream with the raw values from the stream
the stream must only contain atoms of type string
or Buffer
. If not, a
stream error will be emitted.
Stream#serialize if the stream values should be serialized to json
Produce a readable node stream in object mode with the values from the stream
Optional
options: { Optional
atoms?: booleanBy default, only ok
values are emitted, however enabling this
will emit all values.
When not using options.atoms
, any null
atom values will be skipped when piping to the readable stream
Stream#serialize if the stream values should be serialized to json
Static
fromStatic
fromStatic
fromCreate a stream from a node-style callback. A node-compatible callback function will be passed as the first parameter to the callback of this function.
The first parameter provided to the callback (the error
) will be emitted as an Error
atom, whilst the second parameter (the value
) will be emitted as an Ok
atom.
$.fromCallback((next) => someAsyncMethod(paramA, paramB, next));
Static
fromStatic
fromStatic
fromStatic
fromStatic
fromCreate a new stream, and use the provided push
and done
methods to add values to it, and
complete the stream.
push
: Adds the provided value to the stream.done
: Indicatest that the stream is done, meaning that any future calls to push
or
done
will be ignored.Static
ofStatic
ofStatic
ofStatic
ofMap over each value in the stream, produce a stream from it, cache the resultant stream and flatten all the value streams together
Map over each value in the stream, produce a stream from it, and flatten all the value streams together
Map over each error in the stream, produce a stream from it, and flatten all the value streams together.
Maps over each exception in the stream, producing a new stream from it, and flatten all the value streams together.
Produce a stream for each value in the stream. The resulting stream will be emptied, however the resulting values will just be dropped. This is analogous to an asynchronous side effect.
Consume the entire stream, and completely replace it with a new stream. This will remove any errors and exceptions currently on the stream.
Equivalent to:
stream
.filter(() => false)
.otherwise(newStream);
`
Map over each value in the stream, and apply some callback to it. This differs from map
as
the upstream is continually pulled, regardless if the downstream is ready for it and whether
the current iteration has complete.
Approximate timelines are shown below. It isn't 100% correct for what's actually happening, but gives the right idea.
p
: producer (upstream)m
: map operation (asynchronous) 0ms 10ms 20ms 30ms
|---------|---------|---------|---------|
|<-- p -->|<-- m -->| | |
| | |<-- p -->|<-- m -->|
0ms 10ms 20ms 30ms
|---------|---------|---------|---------|
|<-- p -->|<-- m -->| | |
| |<-- p -->|<-- m -->| |
| | |<-- p -->|<-- m -->|
Optional
options: { Optional
delay?: numberOptional
maxOptional
maxReturn a stream containing the first n
values. If options.atoms
is true
, then the
first n
atoms rather than values will be emitted.
Optional
options: { Optional
atoms?: booleanIf enabled, first n
atoms will be counted, otherwise values.
Operate on each item in the stream, reducing it into a single value. The resulting value is returned in its own stream.
Generated using TypeDoc
Type of the 'values' on the stream.