Skip to content

Streamix Logo

A lightweight, high-performance alternative to RxJS
Built for modern apps that need reactive streams without the complexity

Build StatusNPM VersionNPM DownloadsBundle SizeCode CoverageAI-Powered


🚀 Why Streamix? ​

Streamix gives you all the power of reactive programming with 90% less complexity than RxJS. At just 11 KB zipped, it's perfect for modern applications that need performance without bloat.

âœĻ Key Benefits ​

  • ðŸŠķ Ultra Lightweight — Only 9 KB zipped (vs RxJS's ~40 KB)
  • ⚡ High Performance — Pull-based execution means values are only computed when needed
  • ðŸŽŊ Easy to Learn — Familiar API if you know RxJS, simpler if you don't
  • 🔄 Generator-Powered — Built on async generators for natural async flow
  • 🌐 HTTP Ready — Optional HTTP client (~3 KB) for seamless API integration
  • 🧠 Computation-Friendly — Perfect for heavy processing tasks
  • 👁ïļ Native DOM Observation — Built-in streams for intersection, resize, and mutation events

ðŸ“Ķ Installation ​

bash
# npm
npm install @actioncrew/streamix

# yarn
yarn add @actioncrew/streamix

# pnpm
pnpm add @actioncrew/streamix

🏃‍♂ïļ Quick Start ​

Basic Stream Operations ​

typescript
import { eachValueFrom, range, map, filter, take } from '@actioncrew/streamix';

// Create a stream of numbers, transform them, and consume
const stream = range(1, 100)
  .pipe(
    map(x => x * 2),           // Double each number
    filter(x => x % 3 === 0),  // Keep only multiples of 3
    take(5)                    // Take first 5 results
  );

// Consume the stream
for await (const value of eachValueFrom(stream)) {
  console.log(value); // 6, 12, 18, 24, 30
}

Handling User Events ​

typescript
import { eachValueFrom, fromEvent, debounce, map } from '@actioncrew/streamix';

// Debounced search as user types
const searchInput = document.getElementById('search');
const searchStream = fromEvent(searchInput, 'input')
  .pipe(
    map(event => event.target.value),
    debounce(300), // Wait 300ms after user stops typing
    filter(text => text.length > 2)
  );

for await (const searchTerm of eachValueFrom(searchStream)) {
  console.log('Searching for:', searchTerm);
  // Perform search...
}

🔧 Core Concepts ​

Streams ​

Streams are sequences of values over time, implemented as async generators:

typescript
// Creating a custom stream
async function* numberStream() {
  for (let i = 0; i < 10; i++) {
    yield i;
    await new Promise(resolve => setTimeout(resolve, 100));
  }
}

const stream = createStream('numberStream', numberStream);

Operators ​

Transform, filter, and combine streams with familiar operators:

typescript
import { map, filter, mergeMap, combineLatest } from '@actioncrew/streamix';

const processedStream = sourceStream
  .pipe(
    map(x => x * 2),
    filter(x => x > 10),
    mergeMap(x => fetchDataFor(x))
  );

Subjects ​

Manually control stream emissions:

typescript
import { eachValueFrom, Subject, createSubject } from '@actioncrew/streamix';

const subject = createSubject<string>();

// Subscribe to the subject
for await (const value of eachValueFrom(subject)) {
  console.log('Received:', value);
}

// Emit values
subject.next('Hello');
subject.next('World');
subject.complete();

🌐 HTTP Client ​

Streamix includes a powerful HTTP client perfect for reactive applications:

typescript
import { eachValueFrom, map, retry } from '@actioncrew/streamix';
import { 
  createHttpClient, 
  readJson, 
  useBase, 
  useLogger, 
  useTimeout 
} from '@actioncrew/streamix/http';

// Setup client with middleware
const client = createHttpClient().withDefaults(
  useBase("https://api.example.com"),
  useLogger(),
  useTimeout(5000)
);

// Make reactive HTTP requests
const dataStream = retry(() => client.get("/users", readJson), 3)
  .pipe(
    map(users => users.filter(user => user.active))
  );

for await (const activeUsers of eachValueFrom(dataStream)) {
  console.log('Active users:', activeUsers);
}

ðŸŽŊ Real-World Example ​

Here's how to build a live search with API calls and error handling:

typescript
import {
  eachValueFrom,
  fromEvent,
  fromPromise,
  debounce, 
  map, 
  filter, 
  switchMap, 
  catchError,
  startWith 
} from '@actioncrew/streamix';

const searchInput = document.getElementById('search');
const resultsDiv = document.getElementById('results');

const searchResults = fromEvent(searchInput, 'input')
  .pipe(
    map(e => e.target.value.trim()),
    debounce(300),
    filter(query => query.length > 2),
    switchMap(query => fromPromise(
      fetch(`/api/search?q=${query}`)
        .then(r => r.json())
        .catch(err => ({ error: 'Search failed', query })))
    ),
    startWith({ results: [], query: '' })
  );

for await (const result of eachValueFrom(searchResults)) {
  if (result.error) {
    resultsDiv.innerHTML = `<p class="error">${result.error}</p>`;
  } else {
    resultsDiv.innerHTML = result.results
      .map(item => `<div class="result">${item.title}</div>`)
      .join('');
  }
}

📚 Available Operators ​

Streamix includes all the operators you need:

Transformation ​

  • map - Transform each value
  • scan - Accumulate values over time
  • buffer - Collect values into arrays

Filtering ​

  • filter - Keep values that match criteria
  • take - Take first N values
  • takeWhile - Take while condition is true
  • skip - Skip first N values
  • distinct - Remove duplicates

Combination ​

  • mergeMap - Merge multiple streams
  • switchMap - Switch to latest stream
  • combineLatest - Combine latest values
  • concat - Connect streams sequentially

Utility ​

  • tap - Side effects without changing stream
  • delay - Add delays between emissions
  • retry - Retry failed operations
  • finalize - Cleanup when stream completes
  • debounce - Limit emission rate

ðŸŽŪ Live Demos ​

Try Streamix in action:


🔄 Generator-Based Architecture ​

Unlike RxJS's push-based approach, Streamix uses pull-based async generators:

typescript
// Values are only computed when requested
async function* expensiveStream() {
  for (let i = 0; i < 1000000; i++) {
    yield expensiveComputation(i); // Only runs when needed!
  }
}

// Memory efficient - processes one value at a time
const stream = createStream('expensiveStream', expensiveStream)
  .pipe(take(10)); // Only computes first 10 values

This means:

  • Better performance - No wasted computations
  • Lower memory usage - Process one value at a time
  • Natural backpressure - Consumer controls the flow

🆚 Streamix vs RxJS ​

FeatureStreamixRxJS
Bundle Size9 KB~40 KB
Learning CurveGentleSteep
PerformancePull-based (efficient)Push-based
API ComplexitySimpleComplex
Async/Await SupportNativeLimited
Memory UsageLowHigher

📖 Documentation & Resources ​


ðŸĪ Contributing ​

We'd love your help making Streamix even better! Whether it's:

  • 🐛 Bug reports - Found something broken?
  • ðŸ’Ą Feature requests - Have a great idea?
  • 📝 Documentation - Help others learn
  • 🔧 Code contributions - Submit a PR

📋 Share your feedback - Tell us how you're using Streamix!


📄 License ​

MIT License - use Streamix however you need!


Ready to stream? Get started with Streamix today! 🚀
Install from NPM â€Ē View on GitHub â€Ē Give Feedback

API Reference ​

Check the detailed API Reference here.

Released under the MIT License.