Skip to content

Function: observeOn()

observeOn<T>(context): Operator<T, T>

Defined in: operators/observeOn.ts:42

Creates a stream operator that schedules the emission of each value from the source stream on a specified JavaScript task queue.

This operator is a scheduler. It decouples the timing of value production from its consumption, allowing you to control when values are emitted to downstream operators. This is essential for preventing long-running synchronous operations from blocking the main thread and for prioritizing different types of work.

The operator supports three contexts:

  • "microtask": Emits the value at the end of the current task using queueMicrotask.
  • "macrotask": Emits the value in the next event loop cycle using setTimeout(0).
  • "idle": Emits the value when the browser is idle using requestIdleCallback.

Type Parameters

T

T = any

The type of the values in the source and output streams.

Parameters

context

The JavaScript task queue context to schedule emissions on.

"microtask" | "macrotask" | "idle"

Returns

Operator<T, T>

An Operator instance that can be used in a stream's pipe method.

Examples

From observeOn.spec.ts:23

typescript
return 1;
      });
(globalThis as any).requestIdleCallback = mockRequestIdleCallback;

const values: number[] = [];
const emissionOrder: string[] = [];
const stream = createStream('test', async function* () {
      yield 1;
      yield 2;
      yield 3;
    });
const observeOnStream = stream.pipe(observeOn('microtask'));
const consumePromise = (async () => {
      for await (const value of eachValueFrom(observeOnStream)) {
        emissionOrder.push(`value-${value}`);
        values.push(value);
      }
      emissionOrder.push('complete');
    })();
emissionOrder.push('sync-after-subscribe');
await consumePromise;
expect(values).toEqual([1, 2, 3]);
expect(emissionOrder[0]).toBe('sync-after-subscribe');
expect(emissionOrder).toContain('value-1');
expect(emissionOrder[emissionOrder.length - 1]).toBe('complete');

(globalThis as any).requestIdleCallback = originalRequestIdleCallback;

Released under the MIT License.