Skip to content

🎭 From RxJS Maximalist to Generator Fan ​

I used to solve everything with RxJS β€” then I learned async generators. Generators are linear, debuggable, and (with Streamix) still reactive. Fewer marble diagrams, better sleep. And when the app only needs one snapshot, you can downgrade your pipeline to a single await stream.query() β€” pragmatic, simple, and safe. πŸš€


πŸ”₯ The Maximalist Era ​

I used to be that developer. You know the type:

  • switchMap for button clicks ✨
  • combineLatest for boolean logic 🀯
  • Every async operation became a marble-diagram masterpiece 🎨

My pipelines looked like this:

javascript
const userDashboard$ = userId$.pipe(
  switchMap(id => 
    combineLatest([ getUserProfile(id), getUserPosts(id) ])
  ),
  mergeMap(([profile, posts]) => 
    from(posts).pipe(
      concatMap(post => getPostComments(post.id)),
      scan((acc, comments) => [...acc, comments], [])
    )
  )
);

The moment of truth: When a teammate asked "What does this do?"... even I couldn't answer clearly. πŸ˜…


✨ Enter Generators: The Awakening ​

Then someone showed me async generators, and everything changed:

javascript
async function* fetchUserData(userId) {
  const profile = await getUserProfile(userId);
  yield profile;

  const posts = await getUserPosts(userId);
  for (const post of posts) {
    yield post;

    const comments = await getPostComments(post.id);
    yield { post, comments };
  }
}

Holy readability! 🀯

  • βœ… Linear β€” read top to bottom
  • βœ… Debuggable β€” step through like normal code
  • βœ… Simple β€” no marble diagrams required

πŸš€ Streamix: Reactive + Generators = ❀️ ​

I still loved operators, so Streamix was the perfect fit β€” Rx-style operators applied to generator streams:

bash
npm install @actioncrew/streamix
typescript
import { Stream, debounceTime, distinctUntilChanged, eachValueFrom } from '@actioncrew/streamix';

async function* searchFeature(searchInput: Stream<string>) {
  const processed = searchInput.pipe(
    debounceTime(300),
    distinctUntilChanged()
  );

  for await (const query of eachValueFrom(processed)) {
    if (query.trim()) {
      const results = await searchAPI(query);
      yield* results;
    }
  }
}

Readable, reactive, and still composed β€” without late-night marble-diagram angst. πŸŽ‰


🎯 Downgrade pipelines to one value (the practical trick) ​

Here’s the part I wish someone had told me earlier: you can keep your generator pipelines and expose a tiny, explicit bridge for imperative code that only needs one snapshot. That’s what query() is for.

  1. If a latest value exists, query() resolves immediately with it.
  2. Otherwise, query() waits for the next emission and resolves once it arrives.
  3. Multiple callers awaiting query() on the same subject all resolve on that same next emission.
  4. query() is a read-only convenience β€” it doesn’t destructively consume the latest snapshot.

Example β€” build pipeline, expose snapshot ​

Generator pipeline (readable and testable):

typescript
async function* buildUserDashboard(userId$: Stream<T>) {
  for await (const id of eachValueFrom(userId$)) {
    const profile = await getUserProfile(id);
    const posts = await getUserPosts(id);
    yield { id, profile, posts };
  }
}

Wire it to a subject that keeps the latest snapshot:

typescript
const dashboardSubject = createSubject();
dashboardSubject.pipe(buildUserDashboard(userId$));

Now callers who want a single snapshot can just do:

typescript
async function onOpenDashboard() {
  const snapshot = await dashboardSubject.query(); // downgrade to one value
  render(snapshot);
}

Why this matters

  • βœ… Interop: imperative handlers, startup code, and tests can consume streams simply.
  • βœ… Migration-friendly: adopt generator streams incrementally without refactoring every consumer.
  • βœ… Predictable: query() semantics are explicit and easy to document/test.

πŸŽͺ Subjects Without Overthinking ​

Hot Subjects are great for multicasting. They let pipelines broadcast a computed snapshot to multiple listeners. The query() escape hatch keeps things pragmatic β€” no need to force for await everywhere. Use Subjects for sharing and query() for one-shot reads.

typescript
const subject = createSubject<number>();
subject.next(42);
const latest = await subject.query(); // 42 (immediate if latest exists)

βš”οΈ Before vs After: Real Example ​

😡 The Old Me (RxJS Maximalist): ​

typescript
const searchResults$ = searchInput$.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(query => 
    query ? searchAPI(query).pipe(catchError(() => of([]))) : of([])
  )
);

😎 The New Me (Streamix + Generators): ​

typescript
async function* search(searchInput: Stream<T>) {
  const debounced = searchInput.pipe(
    debounceTime(300),
    distinctUntilChanged()
  );

  for await (const query of eachValueFrom(debounced)) {
    if (query) {
      try {
        yield* await searchAPI(query);
      } catch {
        yield* []; // keep calm and carry on
      }
    }
  }
}

Which would you rather debug at 2 AM? πŸ€·β€β™€οΈ


πŸ“š Lessons Learned ​

1. Admit Overengineering πŸ™‹β€β™€οΈ ​

If explaining your code needs a whiteboard, simplify it.

2. Embrace Simplicity 🌱 ​

Not every async operation needs another observable.

3. Pick the Right Tool πŸ› οΈ ​

Use CaseBest Choice
UI events, real-time dataPersonal preferences 🎨
Sequential data pipelines, paginationStreamix + Generators πŸ“Š
Single-value requests, testsPromises / query() πŸš€

πŸŽͺ Try It Yourself ​

javascript
import { pipe, map, filter, take, eachValueFrom } from '@actioncrew/streamix';

async function* processData() {
  const numbers = from([1,2,3,4,5,6,7,8,9,10]).pipe(
    filter(n => n % 2 === 0),
    map(n => n * 2),
    take(3)
  );
  yield* eachValueFrom(numbers); // 4, 8, 12
}

🌟 The Bottom Line ​

Reactive programming isn’t about using every operator in the toolbox. It’s about being readable, pragmatic, and interoperable. Build readable pipelines β€” and when the rest of your app only needs one value, downgrade them with query(). Practical, testable, and keeps everyone sleeping more soundly. πŸ˜„


πŸ’¬ Your Turn! ​

What’s your reactive confession? Ever converted a huge RxJS pipeline into a simple generator? Ready to try the query() trick in your codebase?


Ready to stream? Get started with Streamix today! πŸš€
Install from NPM β€’ View on GitHub β€’ Give Feedback


Remember: Choose your tools wisely, keep it simple, and may your streams be ever readable! πŸš€βœ¨

Released under the MIT License.