Logo
⚠️ Unsaved
[M]:

RxJS Cheatsheet: Essential Operators and Quick Tips

This notebook provides a quick reference guide to RxJS operators and patterns that you'll commonly use in reactive programming. Whether you're building Angular applications, React with RxJS, or Node.js event-driven systems, this cheatsheet will help you find the right operator for your use case.

We'll cover creation operators, transformation techniques, filtering methods, error handling, multicasting with Subjects, and practical patterns - all with runnable examples.

[1]:
// Install RxJS
!npm install rxjs
$ npm install rxjs

added 2 packages in 2s

28 packages are looking for funding
  run `npm fund` for details
[2]:
// Import RxJS
const rxjs = require('rxjs');
const operators = require('rxjs/operators');

console.log(`RxJS loaded successfully\n`);
RxJS loaded successfully
[M]:

1. Creation Operators

Creation operators create Observables from various sources.

[3]:
// of - creates Observable from values
const ofExample = rxjs.of('apple', 'banana', 'cherry');

console.log('of operator example:\n');
ofExample.subscribe(fruit => console.log(`Fruit: ${fruit}\n`));
of operator example:
Fruit: apple
Fruit: banana
Fruit: cherry
[4]:
// from - creates Observable from array, promise, or iterable
const arraySource = rxjs.from([1, 2, 3]);
const promiseSource = rxjs.from(Promise.resolve('Promise resolved'));

console.log('from operator with array:\n');
arraySource.subscribe(num => console.log(`Number: ${num}\n`));

console.log('from operator with promise:\n');
promiseSource.subscribe(result => console.log(`${result}\n`));
from operator with array:
Number: 1
Number: 2
Number: 3
from operator with promise:
Promise resolved
[5]:
// fromEvent - creates Observable from event (simulated for Node)
function fromEventExample() {
return new Promise(resolve => {
// Simulate events using a Subject
const eventTarget = new rxjs.Subject();
const eventStream = eventTarget.asObservable();
console.log('fromEvent example (simulated):\n');
eventStream.subscribe(event => console.log(`Event received: ${JSON.stringify(event)}\n`));
// Simulate events
eventTarget.next({ type: 'click', x: 100, y: 200 });
setTimeout(() => {
eventTarget.next({ type: 'click', x: 120, y: 240 });
resolve();
}, 500);
});
}

await fromEventExample();
fromEvent example (simulated):
Event received: {"type":"click","x":100,"y":200}
Event received: {"type":"click","x":120,"y":240}
[6]:
// interval and timer - emit values periodically
function timeBasedExample() {
return new Promise(resolve => {
console.log('interval and timer examples:\n');
// Interval: emit sequential numbers every 500ms
const interval$ = rxjs.interval(500).pipe(operators.take(3));
const intervalSub = interval$.subscribe(val => console.log(`Interval: ${val}\n`));
// Timer: wait 1000ms, then emit every 500ms
console.log('Timer starts after 1 second...\n');
const timer$ = rxjs.timer(1000, 500).pipe(operators.take(3));
const timerSub = timer$.subscribe(val => console.log(`Timer: ${val}\n`));
// Ensure demo finishes
setTimeout(() => {
intervalSub.unsubscribe();
timerSub.unsubscribe();
resolve();
}, 3000);
});
}

await timeBasedExample();
map operator example:
Doubled: 2
Doubled: 4
Doubled: 6
Doubled: 8
Doubled: 10
[7]:
// range and iif - emit range of values or choose between Observables
const rangeExample = rxjs.range(5, 3); // start at 5, emit 3 values

// Condition for iif demo
const condition = Math.random() > 0.5;
const iifExample = rxjs.iif(
() => condition,
rxjs.of('Condition is true'),
rxjs.of('Condition is false')
);

console.log('range operator example:\n');
rangeExample.subscribe(val => console.log(`Range emitted: ${val}\n`));

console.log('iif operator example:\n');
iifExample.subscribe(val => console.log(`${val}\n`));
console.log(`(Random condition was ${condition ? 'true' : 'false'})\n`);
range operator example:
Range emitted: 5
Range emitted: 6
Range emitted: 7
iif operator example:
Condition is true
(Random condition was true)
[M]:

2. Transformation Operators

These operators transform emitted values.

[8]:
// map - transforms each value
const numbers = rxjs.of(1, 2, 3, 4, 5);
const doubled = numbers.pipe(
operators.map(x => x * 2)
);

console.log('map operator example:\n');
doubled.subscribe(x => console.log(`Doubled: ${x}\n`));
map operator example:
Doubled: 2
Doubled: 4
Doubled: 6
Doubled: 8
Doubled: 10
[9]:
// pluck - extracts a property
const users = rxjs.of(
{ name: 'Alice', age: 25, job: { title: 'Developer', level: 'Senior' } },
{ name: 'Bob', age: 30, job: { title: 'Designer', level: 'Mid' } }
);

const names = users.pipe(operators.pluck('name'));
const jobTitles = users.pipe(operators.pluck('job', 'title'));

console.log('pluck operator example:\n');
names.subscribe(name => console.log(`Name: ${name}\n`));
jobTitles.subscribe(title => console.log(`Job Title: ${title}\n`));
pluck operator example:
Name: Alice
Name: Bob
Job Title: Developer
Job Title: Designer
[10]:
// scan - accumulates values (like reduce, but emits each accumulation)
const clickEvents = rxjs.of('click', 'click', 'click');
const clickCounter = clickEvents.pipe(
operators.scan((count, event) => count + 1, 0)
);

console.log('scan operator example:\n');
clickCounter.subscribe(count => console.log(`Click count: ${count}\n`));
scan operator example:
Click count: 1
Click count: 2
Click count: 3
[11]:
// mergeMap, switchMap, concatMap, exhaustMap comparison
function flatteningOperatorsDemo() {
return new Promise(resolve => {
// Simulated API call
function getDataById(id) {
return rxjs.of(`Data for ID ${id}`).pipe(
operators.delay(400) // Simulate network request
);
}
console.log('Flattening operators comparison:\n');
// Setup for demo: click stream of IDs
const clicks = new rxjs.Subject();
// mergeMap - process all in parallel
console.log('mergeMap example:\n');
const mergeMapped = clicks.pipe(
operators.mergeMap(id => {
console.log(`mergeMap processing ID ${id}\n`);
return getDataById(id);
})
);
mergeMapped.subscribe(data => console.log(`mergeMap result: ${data}\n`));
// Simulate rapid clicks
clicks.next(1);
setTimeout(() => clicks.next(2), 100);
setTimeout(() => clicks.next(3), 200);
// Ensure demo completes
setTimeout(resolve, 1500);
});
}

switchMap example:
Searching for: r
Searching for: re
Searching for: rea
Searching for: reac
Searching for: react
Searching for: redux
Results for "redux"
[12]:
// switchMap - cancels previous inner Observables
function switchMapDemo() {
return new Promise(resolve => {
console.log('switchMap example:\n');
// Simulate a search function
function search(term) {
console.log(`Searching for: ${term}\n`);
return rxjs.of(`Results for "${term}"`).pipe(
operators.delay(300)
);
}
// Simulate rapid typing
const searchTerms = new rxjs.Subject();
const results = searchTerms.pipe(
operators.switchMap(term => search(term))
);
results.subscribe(result => console.log(`${result}\n`));
// User typing "react", then quickly corrects to "redux"
searchTerms.next('r');
setTimeout(() => searchTerms.next('re'), 50);
setTimeout(() => searchTerms.next('rea'), 100);
setTimeout(() => searchTerms.next('reac'), 150);
setTimeout(() => searchTerms.next('react'), 200);
setTimeout(() => searchTerms.next('redux'), 250); // switchMap cancels previous
// Ensure demo completes
setTimeout(resolve, 1000);
});
}

await switchMapDemo();
switchMap example:
Searching for: r
Searching for: re
Searching for: rea
Searching for: reac
Searching for: react
Searching for: redux
Results for "redux"
[13]:
// concatMap - processes in order, one at a time
function concatMapDemo() {
return new Promise(resolve => {
console.log('concatMap example:\n');
function saveItem(item) {
console.log(`Starting to save ${item}\n`);
return rxjs.of(`Saved ${item}`).pipe(
operators.delay(400) // Simulate API call
);
}
const items = rxjs.of('item1', 'item2', 'item3');
const savedItems = items.pipe(
operators.concatMap(item => saveItem(item))
);
console.log('Saving items in sequence:\n');
savedItems.subscribe(result => console.log(`${result}\n`));
// Ensure demo completes
setTimeout(resolve, 1500);
});
}

await concatMapDemo();
exhaustMap example:
Processing click click-1
[14]:
// exhaustMap - ignores new values while processing
function exhaustMapDemo() {
return new Promise(resolve => {
console.log('exhaustMap example:\n');
function processClick(id) {
console.log(`Processing click ${id}\n`);
return rxjs.of(`Processed ${id}`).pipe(
operators.delay(500) // Long operation
);
}
const clicks = new rxjs.Subject();
const results = clicks.pipe(
operators.exhaustMap(id => processClick(id))
);
results.subscribe(result => console.log(`${result}\n`));
// First click is processed
clicks.next('click-1');
// These clicks during processing are ignored with exhaustMap
setTimeout(() => {
console.log('User clicked again during processing (ignored by exhaustMap)\n');
clicks.next('click-2');
}, 100);
setTimeout(() => {
console.log('User clicked again (ignored by exhaustMap)\n');
clicks.next('click-3');
}, 300);
// This click after processing completes will be processed
setTimeout(() => {
Filtering operators examples:
filter (even numbers):
2
4
6
8
10
take(3):
1
2
3
first(even):
2
last():
10
[M]:

3. Filtering Operators

These operators filter values from the stream.

[15]:
// filter, take, first, last
const numbers$ = rxjs.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

const evenNumbers = numbers$.pipe(
operators.filter(n => n % 2 === 0)
);

const firstThreeNumbers = numbers$.pipe(
operators.take(3)
);

const firstEvenNumber = numbers$.pipe(
operators.first(n => n % 2 === 0)
);

const lastNumber = numbers$.pipe(
operators.last()
);

console.log('Filtering operators examples:\n');

console.log('filter (even numbers):\n');
evenNumbers.subscribe(n => console.log(`${n}\n`));

console.log('take(3):\n');
firstThreeNumbers.subscribe(n => console.log(`${n}\n`));

console.log('first(even):\n');
firstEvenNumber.subscribe(n => console.log(`${n}\n`));

console.log('last():\n');
lastNumber.subscribe(n => console.log(`${n}\n`));
Filtering operators examples:
filter (even numbers):
2
4
6
8
10
take(3):
1
2
3
first(even):
2
last():
10
[16]:
// debounce, throttle, distinct operators
function timeBasedFilteringDemo() {
return new Promise(resolve => {
console.log('Time-based filtering operators:\n');
// Simulate user typing
const userInput = new rxjs.Subject();
// Apply different filters
const debouncedInput = userInput.pipe(
operators.debounceTime(300),
operators.tap(term => console.log(`Debounced input: ${term}\n`))
);
const throttledInput = userInput.pipe(
operators.throttleTime(300),
operators.tap(term => console.log(`Throttled input: ${term}\n`))
);
const distinctInput = userInput.pipe(
operators.distinctUntilChanged(),
operators.tap(term => console.log(`Distinct input: ${term}\n`))
);
// Subscribe to see the results
debouncedInput.subscribe();
throttledInput.subscribe();
distinctInput.subscribe();
// Simulate typing with rapid inputs
console.log('User typing: "hello"\n');
userInput.next('h');
setTimeout(() => userInput.next('he'), 100);
setTimeout(() => userInput.next('hel'), 200);
setTimeout(() => userInput.next('hell'), 300);
setTimeout(() => userInput.next('hello'), 400);
Combination operators:
merge operator: (interleaves both streams)
A
B
C
X
Y
Z
concat operator: (stream1 then stream2)
A
B
C
X
Y
Z

User typing with repeats: "aabbc"
Throttled input: a
Distinct input: a
Distinct input: aa
Distinct input: aab
Throttled input: aabb
Distinct input: aabb
Distinct input: aabbc
Debounced input: aabbc
[M]:

4. Combination Operators

These operators combine multiple Observables.

[17]:
// merge and concat
const stream1$ = rxjs.of('A', 'B', 'C');
const stream2$ = rxjs.of('X', 'Y', 'Z');

const merged$ = rxjs.merge(stream1$, stream2$);
const concatenated$ = rxjs.concat(stream1$, stream2$);

console.log('Combination operators:\n');

console.log('merge operator: (interleaves both streams)\n');
merged$.subscribe(value => console.log(`${value}\n`));

console.log('concat operator: (stream1 then stream2)\n');
concatenated$.subscribe(value => console.log(`${value}\n`));
Combination operators:
merge operator: (interleaves both streams)
A
B
C
X
Y
Z
concat operator: (stream1 then stream2)
A
B
C
X
Y
Z
[18]:
// combineLatest, withLatestFrom, zip
function combiningValuesDemo() {
return new Promise(resolve => {
console.log('Combining latest values from streams:\n');
// Create some streams with different timing
const color$ = rxjs.concat(
rxjs.of('red').pipe(operators.delay(100)),
rxjs.of('green').pipe(operators.delay(200)),
rxjs.of('blue').pipe(operators.delay(300))
);
const shape$ = rxjs.concat(
rxjs.of('circle').pipe(operators.delay(50)),
rxjs.of('square').pipe(operators.delay(400))
);
// combineLatest - combines the latest values
const combined$ = rxjs.combineLatest([color$, shape$]);
// zip - combines by index
const zipped$ = rxjs.zip(color$, shape$);
console.log('combineLatest example:\n');
combined$.subscribe(([color, shape]) =>
console.log(`Combined: ${color} ${shape}\n`)
);
console.log('zip example:\n');
zipped$.subscribe(([color, shape]) =>
console.log(`Zipped: ${color} ${shape}\n`)
);
// Ensure demo completes
setTimeout(resolve, 1000);
});
Combining latest values from streams:
combineLatest example:
zip example:
Combined: red circle
Zipped: red circle
Combined: green circle
Combined: green square
Zipped: green square
Combined: blue square
[19]:
// forkJoin - parallel API calls example
function forkJoinDemo() {
return new Promise(resolve => {
console.log('forkJoin for parallel API calls:\n');
function getUserData() {
return rxjs.of({ id: 1, name: 'John Doe' }).pipe(
operators.delay(300),
operators.tap(() => console.log('User data loaded\n'))
);
}
function getUserPosts() {
return rxjs.of([{ id: 1, title: 'Post 1' }, { id: 2, title: 'Post 2' }]).pipe(
operators.delay(500),
operators.tap(() => console.log('User posts loaded\n'))
);
}
function getUserPreferences() {
return rxjs.of({ theme: 'dark', notificationsEnabled: true }).pipe(
operators.delay(200),
operators.tap(() => console.log('User preferences loaded\n'))
);
}
console.log('Loading user profile data...\n');
rxjs.forkJoin({
user: getUserData(),
posts: getUserPosts(),
preferences: getUserPreferences()
}).subscribe(result => {
console.log('All data loaded, ready to display profile:\n');
console.log(JSON.stringify(result, null, 2) + '\n');
resolve();
forkJoin for parallel API calls:
Loading user profile data...
User preferences loaded
User data loaded
User posts loaded
All data loaded, ready to display profile:
{
  "user": {
    "id": 1,
    "name": "John Doe"
  },
  "posts": [
    {
      "id": 1,
      "title": "Post 1"
    },
    {
      "id": 2,
      "title": "Post 2"
    }
  ],
  "preferences": {
    "theme": "dark",
    "notificationsEnabled": true
  }
}
[M]:

5. Error Handling

These operators help with handling errors in reactive streams.

[20]:
// catchError - handles errors by returning a fallback Observable
function catchErrorDemo() {
// Simulate API call that may fail
function getDataFromApi(shouldFail = false) {
return new rxjs.Observable(subscriber => {
if (shouldFail) {
subscriber.error(new Error('API request failed'));
} else {
subscriber.next('API data');
subscriber.complete();
}
});
}
// Success case
console.log('catchError example (success case):\n');
getDataFromApi(false).pipe(
operators.catchError(error => {
console.log(`Caught error: ${error.message}\n`);
return rxjs.of('Fallback data');
})
).subscribe(data => console.log(`Received: ${data}\n`));
// Error case
console.log('catchError example (error case):\n');
getDataFromApi(true).pipe(
operators.catchError(error => {
console.log(`Caught error: ${error.message}\n`);
return rxjs.of('Fallback data');
})
).subscribe(data => console.log(`Received: ${data}\n`));
}

catchErrorDemo();
catchError example (success case):
Received: API data
catchError example (error case):
Caught error: API request failed
Received: Fallback data
[21]:
// retry and retryWhen
function retryDemo() {
return new Promise(resolve => {
console.log('retry and retryWhen examples:\n');
// Simulate flaky API that succeeds after 2 attempts
let attempts = 0;
const flakyApi = new rxjs.Observable(subscriber => {
attempts++;
console.log(`API attempt ${attempts}\n`);
if (attempts < 3) {
subscriber.error(new Error(`Server error on attempt ${attempts}`));
} else {
subscriber.next('API data successfully retrieved');
subscriber.complete();
}
});
console.log('Simple retry (3 attempts):\n');
flakyApi.pipe(
operators.retry(3),
operators.tap({
complete: () => {
// Reset for next demo
attempts = 0;
resolve();
}
})
).subscribe({
next: data => console.log(`${data}\n`),
error: err => console.log(`Gave up after retries: ${err.message}\n`)
});
});
}

retry and retryWhen examples:
Simple retry (3 attempts):
API attempt 1
API attempt 2
API attempt 3
API data successfully retrieved
[22]:
// Implementing exponential backoff with retryWhen
function retryWithBackoffDemo() {
return new Promise(resolve => {
let attempts = 0;
const flakyApi = new rxjs.Observable(subscriber => {
attempts++;
console.log(`API attempt ${attempts}\n`);
if (attempts < 4) {
subscriber.error(new Error(`Server error on attempt ${attempts}`));
} else {
subscriber.next('API data with backoff retry success');
subscriber.complete();
}
});
console.log('Exponential backoff retry:\n');
flakyApi.pipe(
operators.retryWhen(errors =>
errors.pipe(
operators.scan((attempts, error) => {
attempts += 1;
if (attempts >= 5) {
throw error; // Max retries reached
}
// Using shorter delays for demo purposes
const delay = Math.pow(2, attempts) * 100;
console.log(`Retrying after ${delay}ms (attempt ${attempts})\n`);
return attempts;
}, 0),
operators.delay(400) // Using fixed delay for demo, normally would use calculated delay
)
)
).subscribe({
next: data => console.log(`${data}\n`),
error: err => console.log(`All retries failed: ${err.message}\n`),
Basic Subject example:
Emitting values to subject:
Subscriber 1 received: First value
Subscriber 2 received: First value
Subscriber 1 received: Second value
Subscriber 2 received: Second value
API attempt 3
Retrying after 800ms (attempt 3)
API attempt 4
API data with backoff retry success
[M]:

6. Subjects and Multicasting

Subjects are both Observables and Observers, useful for multicasting.

[23]:
// Basic Subject example
console.log('Basic Subject example:\n');

const subject = new rxjs.Subject();

// Add subscribers
subject.subscribe(value => console.log(`Subscriber 1 received: ${value}\n`));
subject.subscribe(value => console.log(`Subscriber 2 received: ${value}\n`));

// Emit values
console.log('Emitting values to subject:\n');
subject.next('First value');
subject.next('Second value');
subject.complete();
Basic Subject example:
Emitting values to subject:
Subscriber 1 received: First value
Subscriber 2 received: First value
Subscriber 1 received: Second value
Subscriber 2 received: Second value
[24]:
// BehaviorSubject - has initial value and replays latest to new subscribers
console.log('BehaviorSubject example:\n');

const behaviorSubject = new rxjs.BehaviorSubject('Initial value');

// First subscriber gets initial value
behaviorSubject.subscribe(value => console.log(`First subscriber: ${value}\n`));

// Emit new value
behaviorSubject.next('Updated value');

// Second subscriber gets latest value
behaviorSubject.subscribe(value => console.log(`Second subscriber: ${value}\n`));

// Current value can be accessed directly
console.log(`Current value: ${behaviorSubject.getValue()}\n`);
BehaviorSubject example:
First subscriber: Initial value
First subscriber: Updated value
Second subscriber: Updated value
Current value: Updated value
[25]:
// ReplaySubject - replays specified number of values to new subscribers
function replaySubjectExample() {
return new Promise(resolve => {
console.log('ReplaySubject example:\n');
// Buffer last 2 values for new subscribers
const replaySubject = new rxjs.ReplaySubject(2);
// Emit some values
replaySubject.next('First value');
replaySubject.next('Second value');
replaySubject.next('Third value');
console.log('New subscriber will get last 2 values:\n');
replaySubject.subscribe(value => console.log(`Replayed: ${value}\n`));
// Emit one more and complete
setTimeout(() => {
replaySubject.next('Fourth value');
replaySubject.complete();
resolve();
}, 300);
});
}

await replaySubjectExample();
ReplaySubject example:
New subscriber will get last 2 values:
Replayed: Second value
Replayed: Third value
Replayed: Fourth value
[26]:
// AsyncSubject - only emits last value, and only when complete
function asyncSubjectExample() {
return new Promise(resolve => {
console.log('AsyncSubject example:\n');
const asyncSubject = new rxjs.AsyncSubject();
// Subscribe before values
asyncSubject.subscribe(value => console.log(`First subscriber: ${value}\n`));
// Emit values
asyncSubject.next('Value 1 - not emitted');
asyncSubject.next('Value 2 - not emitted');
asyncSubject.next('Final value - will be emitted');
// Subscribe after values but before complete
asyncSubject.subscribe(value => console.log(`Second subscriber: ${value}\n`));
setTimeout(() => {
// Complete subject - now values are emitted
console.log('Completing AsyncSubject...\n');
asyncSubject.complete();
// Subscribe after complete - still gets final value
asyncSubject.subscribe(value => console.log(`Late subscriber: ${value}\n`));
resolve();
}, 300);
});
}

await asyncSubjectExample();
AsyncSubject example:
Completing AsyncSubject...
First subscriber: Final value - will be emitted
Second subscriber: Final value - will be emitted
Late subscriber: Final value - will be emitted
[27]:
// Using share() for multicasting
function shareExample() {
return new Promise(resolve => {
console.log('share() operator example:\n');
// Without share - each subscriber triggers separate execution
const time$ = rxjs.timer(0, 1000).pipe(
operators.take(3),
operators.tap(x => console.log(`Source Observable execution: ${x}\n`))
);
console.log('Without sharing - two separate executions:\n');
const sub1 = time$.subscribe(x => console.log(`Observer 1: ${x}\n`));
const sub2 = time$.subscribe(x => console.log(`Observer 2: ${x}\n`));
// Clean up after first demo
setTimeout(() => {
sub1.unsubscribe();
sub2.unsubscribe();
// With share - only one execution shared between subscribers
console.log('\nWith share() - only one execution:\n');
const sharedTime$ = rxjs.timer(0, 1000).pipe(
operators.take(3),
operators.tap(x => console.log(`Shared Observable execution: ${x}\n`)),
operators.share()
);
const sharedSub1 = sharedTime$.subscribe(x => console.log(`Shared Observer 1: ${x}\n`));
const sharedSub2 = sharedTime$.subscribe(x => console.log(`Shared Observer 2: ${x}\n`));
// Resolve after shared demo completes
setTimeout(() => {
sharedSub1.unsubscribe();
sharedSub2.unsubscribe();
resolve();
HTTP request caching pattern:
First request to /api/data - should be a cache miss:
❓ Cache miss for /api/data
🌐 HTTP GET: /api/data

With share() - only one execution:
Shared Observable execution: 0
Shared Observer 1: 0
Shared Observer 2: 0
💾 Cached response for /api/data
📄 Response from /api/data

Second request to /api/data after 1s - should be a cache hit:
🏎️ Cache hit for /api/data (age: 0ms)
📄 Response from /api/data
Shared Observable execution: 1
Shared Observer 1: 1
Shared Observer 2: 1
Shared Observable execution: 2
Shared Observer 1: 2
Shared Observer 2: 2

Third request to /api/data after 3s - should be a cache miss (expired):
⏱️ Cache expired for /api/data (age: 2010ms)
🌐 HTTP GET: /api/data
[M]:

7. Practical Patterns

Common patterns you'll use in reactive applications.

[28]:
// Pattern: Typeahead/Search with debounce
function typeaheadPattern() {
return new Promise(resolve => {
console.log('Typeahead search pattern:\n');
// Simulate user typing in search box
const searchTerms = [
'r', 're', 'rea', 'reac', 'react'
];
const userInput$ = rxjs.from(searchTerms).pipe(
operators.concatMap(term => rxjs.of(term).pipe(operators.delay(300)))
);
// Function to simulate API search
function searchApi(term) {
console.log(`🔍 Searching API for: "${term}"\n`);
return rxjs.of(`Results for "${term}"`).pipe(operators.delay(300));
}
// Implement typeahead pattern
userInput$.pipe(
operators.tap(term => console.log(`User typed: ${term}\n`)),
operators.debounceTime(500), // Wait for user to stop typing
operators.filter(term => term.length > 1), // Only search for terms with 2+ chars
operators.distinctUntilChanged(), // Don't search for same term twice
operators.switchMap(term => searchApi(term)) // Cancel previous searches
).subscribe({
next: results => console.log(`📋 ${results}\n`),
complete: () => {
console.log('Typeahead demo completed\n');
resolve();
}
});
});
}
Typeahead search pattern:
User typed: r
Source Observable execution: 1
Observer 1: 1
Source Observable execution: 1
Observer 2: 1
User typed: re
User typed: rea
User typed: reac
Source Observable execution: 2
Observer 1: 2
Source Observable execution: 2
Observer 2: 2
User typed: react
🔍 Searching API for: "react"
📋 Results for "react"
Typeahead demo completed
[29]:
// Pattern: Caching HTTP results
function cachingPattern() {
return new Promise(resolve => {
console.log('HTTP request caching pattern:\n');
// Cache storage
const cache = new Map();
// Function to simulate HTTP request
function httpGet(url) {
console.log(`🌐 HTTP GET: ${url}\n`);
return rxjs.of(`Response from ${url}`).pipe(
operators.delay(1000) // Simulate network latency
);
}
// Create a cached HTTP function
function cachedHttpGet(url, maxAge = 5000) {
// Check if we have a valid cached response
if (cache.has(url)) {
const cachedResponse = cache.get(url);
const age = Date.now() - cachedResponse.timestamp;
if (age < maxAge) {
console.log(`🏎️ Cache hit for ${url} (age: ${age}ms)\n`);
return rxjs.of(cachedResponse.data);
} else {
console.log(`⏱️ Cache expired for ${url} (age: ${age}ms)\n`);
}
} else {
console.log(`❓ Cache miss for ${url}\n`);
}
// If no valid cache, make actual request and cache result
return httpGet(url).pipe(
operators.tap(response => {
HTTP request caching pattern:
First request to /api/data - should be a cache miss:
❓ Cache miss for /api/data
🌐 HTTP GET: /api/data

With share() - only one execution:
Shared Observable execution: 0
Shared Observer 1: 0
Shared Observer 2: 0
💾 Cached response for /api/data
📄 Response from /api/data

Second request to /api/data after 1s - should be a cache hit:
🏎️ Cache hit for /api/data (age: 0ms)
📄 Response from /api/data
Shared Observable execution: 1
Shared Observer 1: 1
Shared Observer 2: 1
Shared Observable execution: 2
Shared Observer 1: 2
Shared Observer 2: 2

Third request to /api/data after 3s - should be a cache miss (expired):
⏱️ Cache expired for /api/data (age: 2010ms)
🌐 HTTP GET: /api/data
💾 Cached response for /api/data
📄 Response from /api/data
[30]:
// Pattern: Load balancing / Rate limiting
function rateLimitingPattern() {
return new Promise(resolve => {
console.log('Rate limiting pattern:\n');
// Function to simulate an API call
function processItem(id) {
return new rxjs.Observable(subscriber => {
console.log(`⚙️ Processing item ${id}...\n`);
setTimeout(() => {
subscriber.next(`✅ Processed item ${id}`);
subscriber.complete();
}, 1000);
});
}
// Create rate limiter with concurrency control
function createRateLimiter(maxConcurrent) {
const queue = new rxjs.Subject();
// Process queue with concurrency limit
queue.pipe(
operators.mergeMap(id => processItem(id), maxConcurrent)
).subscribe(result => console.log(`${result}\n`));
return (id) => queue.next(id);
}
// Create a rate limiter with max 2 concurrent operations
console.log('Creating rate limiter with max 2 concurrent operations:\n');
const processWithLimit = createRateLimiter(2);
// Submit 6 items quickly
console.log('Submitting 6 items rapidly:\n');
for (let i = 1; i <= 6; i++) {
processWithLimit(i);
Rate limiting pattern:
Creating rate limiter with max 2 concurrent operations:
Submitting 6 items rapidly:
⚙️ Processing item 1...
⚙️ Processing item 2...
✅ Processed item 1
⚙️ Processing item 3...
✅ Processed item 2
⚙️ Processing item 4...
✅ Processed item 3
⚙️ Processing item 5...
✅ Processed item 4
⚙️ Processing item 6...
✅ Processed item 5
✅ Processed item 6
[M]:

8. Quick Tips and Best Practices

  1. Always unsubscribe from long-lived Observables to prevent memory leaks:

    const subscription = observable.subscribe(...);
    // Later when done
    subscription.unsubscribe();
    
  2. Use takeUntil for declarative unsubscription:

    const destroy$ = new Subject();
    observable.pipe(
      takeUntil(destroy$)
    ).subscribe(...);
    // When done
    destroy$.next();
    destroy$.complete();
    
  3. Choose the right flattening operator:

    • switchMap: Cancel previous and switch to new (search, latest value)
    • mergeMap: Process all in parallel (independent operations)
    • concatMap: Process in sequence (order matters)
    • exhaustMap: Ignore new until current completes (prevent duplicates)
  4. Avoid nested subscribes - use flattening operators instead:

    // BAD
    observable1.subscribe(value1 => {
      observable2.subscribe(value2 => { /* ... */ });
    });
    
    // GOOD
    observable1.pipe(
      switchMap(value1 => observable2)
    ).subscribe(value2 => { /* ... */ });
    
  5. Use share() or shareReplay() for multicasting:

    const shared$ = observable.pipe(share());
    const cached$ = observable.pipe(shareReplay({bufferSize: 1, refCount: true}));
    
  6. Handle errors properly to prevent breaking your stream:

    observable.pipe(
      catchError(err => {
        console.error('Error caught:', err);
        return of(fallbackValue); // Return fallback Observable
      })
    )
    
  7. Use finalize() for cleanup operations:

    observable.pipe(
      finalize(() => console.log('Done - cleaning up resources'))
    )
    
  8. Implement cancellation for long operations with takeUntil:

    const cancel$ = new Subject();
    
    longOperation$.pipe(
      takeUntil(cancel$)
    ).subscribe(...);
    
    // Call this to cancel
    cancel$.next();
    
  9. Use BehaviorSubject for state management:

    const state$ = new BehaviorSubject(initialState);
    
    // Update state
    state$.next({...state$.getValue(), property: newValue});
    
    // React to state changes
    state$.subscribe(state => updateUI(state));
    
  10. Prefer pipeable operators over prototype operators for better tree-shaking

[M]:

Summary

RxJS provides powerful tools for handling asynchronous operations in a functional, reactive way. This cheatsheet covered:

  • Creation operators for starting Observable streams
  • Transformation operators to map, merge, and process data
  • Filtering operators to select what data passes through
  • Combination operators to work with multiple streams
  • Error handling techniques for resilient streams
  • Subjects for multicasting to multiple subscribers
  • Practical patterns for common application scenarios

RxJS has a learning curve, mastering these core operators and patterns will help you create more maintainable, declarative, and robust asynchronous code.

Sign in to save your work and access it from anywhere