Skip to content

The Ordeal of a Pull-Based Subject Implementation 🀯 ​

Implementing a pull-based hot subject (like RxJS’s Subject, but with pull semantics), is a deceptively complex task. While a simple observer pattern seems straightforward, building a robust, thread-safe, and predictable subject requires careful management of asynchronous operations, state, and multiple consumers. The core challenge is bridging the push-based nature of the next() method with the pull-based nature of the subscribe() loop, all while handling a variety of edge cases correctly.

1. The Async Queue: Managing Concurrency 🚦 ​

One of the first challenges is managing the concurrency of operations. The next(), complete(), error(), and subscribe() methods can all be called at any time, potentially in rapid succession. Without a mechanism to serialize these actions, you risk race conditions and an inconsistent state.

The code addresses this by using a createQueue primitive. All state-modifying operationsβ€”writing to the buffer, completing the stream, or detaching a readerβ€”are pushed into this single, sequential queue. This ensures that actions are processed one at a time, preventing multiple async calls from corrupting the subject's state.

2. The Multi-Reader Buffer: The Heart of the Hot Stream ❀️‍πŸ”₯ ​

A "hot" stream needs to broadcast the same values to all of its listeners. Unlike a "cold" stream that creates a new data source for each subscriber, a subject's single source of truth must be accessible by all.

The implementation uses a createSingleValueBuffer primitive. This special buffer allows multiple readers to "attach" to it. When a new value is written to the buffer, it notifies all attached readers, allowing them to pull the value. This design is the key to multicasting. The buffer holds the responsibility of notifying all readers, which is a significant part of the subject's complexity.

3. The Endless Subscription Loop: Pulling Values while (true) πŸ”„ ​

A pull-based subject's core functionality relies on a continuous pull loop. This loop, often implemented with while (true), repeatedly calls await buffer.read(readerId) to pause execution until a new value is available from the buffer. The await keyword pulls the subscriber into action when a value is ready.

To prevent blocking the application, the await operation utilizes the microtask queue. This ensures that even when a value is being pulled from a different asynchronous context (e.g., a web worker or database query), the subscriber's loop can resume without halting the main event loop, keeping the application responsive.

4. Subscription Lifecycle and Cleanup: Preventing Leaks πŸ—‘οΈ ​

A subject implementation must manage the lifecycle of each subscription to prevent memory leaks and ensure resources are properly released.

The subscribe() function returns a Subscription object. This object includes an unsubscribe() method that triggers the unsubscribing flag. This flag, in turn, queues a task to detach the reader from the buffer.

The use of readerId is critical here; it provides a unique identifier for each subscriber, allowing the buffer to correctly detach only the specified reader when unsubscribe() is called.

5. The "Snappy" and "Query" APIs: Bridging Imperative Gaps πŸŒ‰ ​

To be truly useful, a reactive stream needs to interoperate with existing imperative code. Providing synchronous and one-shot async access to the latest value adds another layer of complexity.

  • snappy: This getter is a simple, synchronous way to access the most recently pushed value of a subject. It's a pragmatic convenience that bridges the asynchronous, reactive world with the synchronous, imperative needs of an application.
  • query(): This method uses firstValueFrom to get the next value from the stream and resolve a promise with it. This is a great pattern for one-shot reads, but it relies on the underlying stream (this) and its subscription logic to work correctly.

Conclusion ​

In conclusion, a pull-based subject is a complex orchestration of asynchronous primitives. Its difficulty lies in building a robust system that can gracefully handle multiple, concurrent subscribers while maintaining state, ensuring correct cleanup, and providing a clean API that bridges reactive and imperative paradigms.


Ready to stream? Get started with Streamix today! πŸš€
Install from NPM β€’ View on GitHub β€’ Give Feedback


Remember: Choose your tools wisely, keep it simple, and may your reactive pipelines be pragmatic and interoperable with everything else. πŸš€βœ¨

Released under the MIT License.