Skip to content

Function: expand()

expand<T>(project, options): Operator<T, T>

Defined in: operators/expand.ts:22

Creates a stream operator that recursively expands each emitted value.

This operator takes each value from the source stream and applies the project function to it, which must return a new stream. It then recursively applies the same logic to each value emitted by that new stream, effectively flattening an infinitely deep, asynchronous data structure.

This is particularly useful for traversing graph or tree-like data, such as file directories or hierarchical API endpoints, where each item might lead to a new collection of items that also need to be processed.

Type Parameters

T

T = any

The type of the values in the source and output streams.

Parameters

project

(value) => Stream<T> | CallbackReturnType<T> | T[]

A function that takes a value and returns a stream of new values to be expanded.

options

RecurseOptions = {}

An optional configuration object for the underlying recurse operator.

Returns

Operator<T, T>

An Operator instance that can be used in a stream's pipe method.

Examples

From expand.spec.ts:4

typescript
const error = new Error('Project function error');
const project = (value: number) => {
  if (value === 3) throw error;
  return from([value + 1]);
};
const stream = from([1]);
const result: number[] = [];
let caughtError: Error | undefined;
try {
  for await (const value of eachValueFrom(stream.pipe(expand(project)))) {
    result.push(value);
  }
} catch (e) {
  caughtError = e as Error;
}
expect(result).toEqual([1, 2]);
expect(caughtError).toEqual(error);

Released under the MIT License.