π From RxJS Maximalist to Generator Fan β
I used to solve everything with RxJS β then I learned async generators. Generators are linear, debuggable, and (with Streamix) still reactive. Fewer marble diagrams, better sleep. And when the app only needs one snapshot, you can downgrade your pipeline to a single await stream.query()
β pragmatic, simple, and safe. π
π₯ The Maximalist Era β
I used to be that developer. You know the type:
switchMap
for button clicks β¨combineLatest
for boolean logic π€―- Every async operation became a marble-diagram masterpiece π¨
My pipelines looked like this:
const userDashboard$ = userId$.pipe(
switchMap(id =>
combineLatest([ getUserProfile(id), getUserPosts(id) ])
),
mergeMap(([profile, posts]) =>
from(posts).pipe(
concatMap(post => getPostComments(post.id)),
scan((acc, comments) => [...acc, comments], [])
)
)
);
The moment of truth: When a teammate asked "What does this do?"... even I couldn't answer clearly. π
β¨ Enter Generators: The Awakening β
Then someone showed me async generators, and everything changed:
async function* fetchUserData(userId) {
const profile = await getUserProfile(userId);
yield profile;
const posts = await getUserPosts(userId);
for (const post of posts) {
yield post;
const comments = await getPostComments(post.id);
yield { post, comments };
}
}
Holy readability! π€―
- β Linear β read top to bottom
- β Debuggable β step through like normal code
- β Simple β no marble diagrams required
π Streamix: Reactive + Generators = β€οΈ β
I still loved operators, so Streamix was the perfect fit β Rx-style operators applied to generator streams:
npm install @actioncrew/streamix
import { Stream, debounceTime, distinctUntilChanged, eachValueFrom } from '@actioncrew/streamix';
async function* searchFeature(searchInput: Stream<string>) {
const processed = searchInput.pipe(
debounceTime(300),
distinctUntilChanged()
);
for await (const query of eachValueFrom(processed)) {
if (query.trim()) {
const results = await searchAPI(query);
yield* results;
}
}
}
Readable, reactive, and still composed β without late-night marble-diagram angst. π
π― Downgrade pipelines to one value (the practical trick) β
Hereβs the part I wish someone had told me earlier: you can keep your generator pipelines and expose a tiny, explicit bridge for imperative code that only needs one snapshot. Thatβs what query()
is for.
What query()
should do (recommended semantics) β
- If a latest value exists,
query()
resolves immediately with it. - Otherwise,
query()
waits for the next emission and resolves once it arrives. - Multiple callers awaiting
query()
on the same subject all resolve on that same next emission. query()
is a read-only convenience β it doesnβt destructively consume the latest snapshot.
Example β build pipeline, expose snapshot β
Generator pipeline (readable and testable):
async function* buildUserDashboard(userId$: Stream<T>) {
for await (const id of eachValueFrom(userId$)) {
const profile = await getUserProfile(id);
const posts = await getUserPosts(id);
yield { id, profile, posts };
}
}
Wire it to a subject that keeps the latest snapshot:
const dashboardSubject = createSubject();
dashboardSubject.pipe(buildUserDashboard(userId$));
Now callers who want a single snapshot can just do:
async function onOpenDashboard() {
const snapshot = await dashboardSubject.query(); // downgrade to one value
render(snapshot);
}
Why this matters
- β Interop: imperative handlers, startup code, and tests can consume streams simply.
- β Migration-friendly: adopt generator streams incrementally without refactoring every consumer.
- β
Predictable:
query()
semantics are explicit and easy to document/test.
πͺ Subjects Without Overthinking β
Hot Subjects are great for multicasting. They let pipelines broadcast a computed snapshot to multiple listeners. The query()
escape hatch keeps things pragmatic β no need to force for await
everywhere. Use Subjects for sharing and query()
for one-shot reads.
const subject = createSubject<number>();
subject.next(42);
const latest = await subject.query(); // 42 (immediate if latest exists)
βοΈ Before vs After: Real Example β
π΅ The Old Me (RxJS Maximalist): β
const searchResults$ = searchInput$.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query =>
query ? searchAPI(query).pipe(catchError(() => of([]))) : of([])
)
);
π The New Me (Streamix + Generators): β
async function* search(searchInput: Stream<T>) {
const debounced = searchInput.pipe(
debounceTime(300),
distinctUntilChanged()
);
for await (const query of eachValueFrom(debounced)) {
if (query) {
try {
yield* await searchAPI(query);
} catch {
yield* []; // keep calm and carry on
}
}
}
}
Which would you rather debug at 2 AM? π€·ββοΈ
π Lessons Learned β
1. Admit Overengineering πββοΈ β
If explaining your code needs a whiteboard, simplify it.
2. Embrace Simplicity π± β
Not every async operation needs another observable.
3. Pick the Right Tool π οΈ β
Use Case | Best Choice |
---|---|
UI events, real-time data | Personal preferences π¨ |
Sequential data pipelines, pagination | Streamix + Generators π |
Single-value requests, tests | Promises / query() π |
πͺ Try It Yourself β
import { pipe, map, filter, take, eachValueFrom } from '@actioncrew/streamix';
async function* processData() {
const numbers = from([1,2,3,4,5,6,7,8,9,10]).pipe(
filter(n => n % 2 === 0),
map(n => n * 2),
take(3)
);
yield* eachValueFrom(numbers); // 4, 8, 12
}
π The Bottom Line β
Reactive programming isnβt about using every operator in the toolbox. Itβs about being readable, pragmatic, and interoperable. Build readable pipelines β and when the rest of your app only needs one value, downgrade them with query()
. Practical, testable, and keeps everyone sleeping more soundly. π
π¬ Your Turn! β
Whatβs your reactive confession? Ever converted a huge RxJS pipeline into a simple generator? Ready to try the query()
trick in your codebase?
Ready to stream? Get started with Streamix today! π
Install from NPM β’ View on GitHub β’ Give Feedback
Remember: Choose your tools wisely, keep it simple, and may your streams be ever readable! πβ¨