Class Stream<T, E>

Type Parameters

  • T

    Type of the 'values' on the stream.

  • E

    Type of the 'errors' on the stream.

Hierarchy

  • HigherOrderStream<T, E>
    • Stream

Constructors

  • Type Parameters

    • T

    • E

    Parameters

    • stream: Readable

    Returns Stream<T, E>

Properties

stackTrace: string[] = []
stream: Readable
traceComplete: boolean = false
StreamEnd: symbol = ...

Marker for the end of a stream.

Methods

  • Batch items together on the stream and emit them as an array. The batch can be over some size, or can be over a period of time.

    Parameters

    • options: {
          byBucket?: ((value) => string | number);
          n?: number;
          timeout?: number;
          yieldEmpty?: boolean;
          yieldRemaining?: boolean;
      }
      • Optional byBucket?: ((value) => string | number)
          • (value): string | number
          • Parameters

            • value: T

            Returns string | number

      • Optional n?: number

        Batch up to n items (activates batching by count)

      • Optional timeout?: number

        Batch for timeout milliseconds (activates batching by timeout)

      • Optional yieldEmpty?: boolean

        If timeout is reached and no items have been emitted on the stream, still emit an empty array.

      • Optional yieldRemaining?: boolean

    Returns Stream<T[], E>

  • Completely exhaust the stream, driving it to completion. This is particularly useful when side effects of the stream are desired, but the actual values of the stream are not needed.

    Returns Promise<void>

  • Capture the current trace. Creates a clone of the trace to prevent it being modified.

    Returns string[]

  • Run the provided callback after the first atom passes through the stream.

    Parameters

    • callback: (() => void)

      Callback to run.

        • (): void
        • Returns void

    • Optional options: {
          atom?: boolean;
      } = {}
      • Optional atom?: boolean

        Run on any atom (true by default). Will only run after first ok atom if false.

    Returns Stream<T, E>

  • Run the provided callback after the last atom passes through the stream.

    Parameters

    • callback: (() => void)

      Callback to run.

        • (): void
        • Returns void

    • Optional options: {
          atom?: boolean;
      } = {}
      • Optional atom?: boolean

        Run on any atom (true by default). Will only run after last ok atom if false.

    Returns Stream<T, E>

  • Add a layer to the trace object. Returns a copy of the current trace.

    Parameters

    • trace: string

    Returns string[]

  • Create a stream and corresponding writable Node stream, where any writes to the writable Node stream will be emitted on the returned stream.

    Type Parameters

    • T

    • E

    Returns {
        stream: Stream<T, E>;
        writable: Writable;
    }

Atom

  • Create an exception atom with the provided value.

    Type Parameters

    • T

    • E

    Parameters

    • value: unknown
    • trace: string[]

    Returns Atom<T, E>

  • Type Parameters

    • T

    • E

    Parameters

    • value: unknown
    • trace: string[]

    Returns Atom<T, E>

    Deprecated

    use exception instead

Consumption

  • Create an iterator that will emit each atom in the stream.

    Returns AsyncIterator<Atom<T, E>, any, undefined>

  • Serialise the stream, and produce a Node stream with the serialised result.

    Parameters

    • Optional options: {
          atoms?: boolean;
          single?: boolean;
      }
      • Optional atoms?: boolean

        By default, only ok values are serialised, however enabling this will serialise all values.

      • Optional single?: boolean

        Whether to emit an array for multiple items, or only a single item.

    Returns Readable

    See

    Stream#toReadable if serialisation is not required

  • Iterate through each atom in the stream, and return them as a single array.

    Parameters

    • Optional options: {
          atoms?: false;
          reject?: boolean;
      }
      • Optional atoms?: false

        Return every atom on the stream.

      • Optional reject?: boolean

        If an error or exception is encountered, reject the promise with it.

    Returns Promise<T[]>

  • Parameters

    • Optional options: {
          atoms: true;
      }
      • atoms: true

    Returns Promise<Atom<T, E>[]>

  • Produce a readable node stream with the values from the stream.

    Parameters

    • kind: "object" | "raw"

      What kind of readable stream to produce. When "raw" only strings and buffers can be emitted on the stream. Use "object" to preserve objects in the readable stream. Note that object values are not serialised, they are emitted as objects.

    • Optional options: {
          atoms?: boolean;
      }

      Options for configuring how atoms are output on the stream

      • Optional atoms?: boolean

    Returns Readable

    See

    Stream#serialize if the stream values should be serialized to json

  • Produce a readable node stream with the raw values from the stream

    Parameters

    • kind: "raw"

    Returns Readable

    Note

    the stream must only contain atoms of type string or Buffer. If not, a stream error will be emitted.

    See

    Stream#serialize if the stream values should be serialized to json

  • Produce a readable node stream in object mode with the values from the stream

    Parameters

    • kind: "object"
    • Optional options: {
          atoms?: boolean;
      }
      • Optional atoms?: boolean

        By default, only ok values are emitted, however enabling this will emit all values.

    Returns Readable

    Note

    When not using options.atoms, any null atom values will be skipped when piping to the readable stream

    See

    Stream#serialize if the stream values should be serialized to json

  • Create an async iterator that will emit each value in the stream.

    Returns AsyncIterableIterator<T>

Creation

  • Create a stream from an array.

    Type Parameters

    • T

    • E

    Parameters

    • array: MaybeAtom<T, E>[]

      The array that values will be emitted from.

    Returns Stream<T, E>

    Note

    The array will be shallow cloned internally, so that the original array won't be impacted.

  • Create a stream from a node-style callback. A node-compatible callback function will be passed as the first parameter to the callback of this function.

    The first parameter provided to the callback (the error) will be emitted as an Error atom, whilst the second parameter (the value) will be emitted as an Ok atom.

    Type Parameters

    • T

    • E

    Parameters

    • cb: ((next) => void)
        • (next): void
        • Parameters

          • next: ((error, value) => unknown)
              • (error, value): unknown
              • Parameters

                • error: E
                • value: T

                Returns unknown

          Returns void

    Returns Stream<T, E>

    Example

    $.fromCallback((next) => someAsyncMethod(paramA, paramB, next));
    
  • Create a stream from an iterable.

    Type Parameters

    • T

    • E

    Parameters

    • iterable: Iterable<MaybeAtom<T, E>> | AsyncIterable<MaybeAtom<T, E>>

      The iterable that will produce an iterator, which may be an async iterator.

    Returns Stream<T, E>

  • Create a stream from an iterator.

    Type Parameters

    • T

    • E

    Parameters

    • iterator: Iterator<MaybeAtom<T, E>, any, undefined> | AsyncIterator<MaybeAtom<T, E>, any, undefined>

      The iterator that will produce values, which may be an async iterator.

    Returns Stream<T, E>

  • Create a new stream with the provided atom producer.

    Type Parameters

    • T

    • E

    Parameters

    • next: (() => Promise<symbol | MaybeAtom<T, E>>)

      A callback method to produce the next atom. If no atom is available, then StreamEnd must be returned.

    Returns Stream<T, E>

  • Create a stream from a promise. The promise will be awaited, and the resulting value only ever emitted once.

    Type Parameters

    • T

    • E

    Parameters

    • promise: Promise<MaybeAtom<T, E>>

      The promise to create the stream from.

    Returns Stream<T, E>

  • Create a new stream, and use the provided push and done methods to add values to it, and complete the stream.

    • push: Adds the provided value to the stream.
    • done: Indicatest that the stream is done, meaning that any future calls to push or done will be ignored.

    Type Parameters

    • T

    • E

    Returns {
        done: (() => void);
        push: ((value) => void);
        stream: Stream<T, E>;
    }

    • done: (() => void)
        • (): void
        • Returns void

    • push: ((value) => void)
        • (value): void
        • Parameters

          Returns void

    • stream: Stream<T, E>
  • Create a new stream containing a single value. Unless an atom is provided, it will be converted to an ok atom.

    Type Parameters

    • T

    • E

    Parameters

    Returns Stream<T, E>

  • Create a new stream containing a single error atom.

    Type Parameters

    • T

    • E

    Parameters

    • value: E

    Returns Stream<T, E>

  • Create a new stream containing a single exception atom.

    Type Parameters

    • T

    • E

    Parameters

    • value: unknown

    Returns Stream<T, E>

  • Type Parameters

    • T

    • E

    Parameters

    • value: unknown

    Returns Stream<T, E>

    Deprecated

    use ofException instead

Higher Order

  • Map over each value in the stream, produce a stream from it, cache the resultant stream and flatten all the value streams together

    Type Parameters

    • U

    Parameters

    Returns Stream<U, E>

  • Produce a new stream from the stream that has any nested streams flattened

    Returns T extends Stream<U, E>
        ? Stream<U, E>
        : Stream<T, E>

    Note

    Any atoms that are not nested streams are emitted as-is

  • Emit items from provided stream if this stream is completely empty.

    Parameters

    Returns Stream<T, E>

    Note

    If there are any errors or exceptions on the stream, then the new stream won't be consumed.

  • Consume the entire stream, and completely replace it with a new stream. This will remove any errors and exceptions currently on the stream.

    Equivalent to:

    stream
    .filter(() => false)
    .otherwise(newStream);

    `

    Type Parameters

    • U

    • F

    Parameters

    Returns Stream<U, F>

Transform

  • Map over each value in the stream, and apply some callback to it. This differs from map as the upstream is continually pulled, regardless if the downstream is ready for it and whether the current iteration has complete.

    Example

    Approximate timelines are shown below. It isn't 100% correct for what's actually happening, but gives the right idea.

    • p: producer (upstream)
    • m: map operation (asynchronous)

    Map

     0ms       10ms      20ms      30ms
    |---------|---------|---------|---------|
    |<-- p -->|<-- m -->| | |
    | | |<-- p -->|<-- m -->|

    Buffered Map

     0ms       10ms      20ms      30ms
    |---------|---------|---------|---------|
    |<-- p -->|<-- m -->| | |
    | |<-- p -->|<-- m -->| |
    | | |<-- p -->|<-- m -->|

    Type Parameters

    • U

    Parameters

    Returns Stream<U, E>

  • Collect the values of the stream atoms into an array then return a stream which emits that array

    Returns Stream<T[], E>

    Note

    non-ok atoms are emitted as-is, the collected array is always emitted last

    Note

    empty streams will emit an empty array

  • Consume the stream atoms, emitting new atoms from the generator.

    Type Parameters

    • U

    • F

    Parameters

    • generator: ((it) => AsyncGenerator<Atom<U, F>, any, unknown>)
        • (it): AsyncGenerator<Atom<U, F>, any, unknown>
        • Parameters

          Returns AsyncGenerator<Atom<U, F>, any, unknown>

    Returns Stream<U, F>

  • Drop the first n items from the stream.

    Parameters

    • n: number
    • Optional options: {
          atoms?: boolean;
      }
      • Optional atoms?: boolean

    Returns Stream<T, E>

  • Filter over each value in the stream.

    Parameters

    • condition: ((value) => unknown)
        • (value): unknown
        • Parameters

          • value: T

          Returns unknown

    Returns Stream<T, E>

  • Return a stream containing the first n values. If options.atoms is true, then the first n atoms rather than values will be emitted.

    Parameters

    • n: number
    • Optional options: {
          atoms?: boolean;
      }
      • Optional atoms?: boolean

        If enabled, first n atoms will be counted, otherwise values.

    Returns Stream<T, E>

  • Run a callback for each value in the stream, ideal for side effects on stream items.

    Parameters

    • cb: ((value) => unknown)
        • (value): unknown
        • Parameters

          • value: T

          Returns unknown

    Returns Stream<T, E>

Transforms

Generated using TypeDoc