Logo
⚠️ Unsaved
[M]:

Core Concepts of Reactive Programming with RxJS

Reactive programming is a declarative programming paradigm focused on data streams and the propagation of change. RxJS (Reactive Extensions for JavaScript) is a library that makes reactive programming in JavaScript possible and practical.

In this notebook, we'll explore the fundamental concepts that make reactive programming powerful for handling asynchronous operations, event-based systems, and complex data flows.

[1]:
// Install RxJS
!npm install rxjs
$ npm install rxjs

added 2 packages in 2s

28 packages are looking for funding
  run `npm fund` for details
[2]:
// Import RxJS
const rxjs = require('rxjs');
const operators = require('rxjs/operators');

console.log('RxJS loaded successfully\n');
RxJS loaded successfully
[M]:

1. Observables: The Foundation of Reactive Programming

Observables are the core building block in RxJS. They represent a stream of values over time that you can observe and react to. Think of an Observable as a sequence of data that can be emitted either synchronously or asynchronously.

[3]:
// Creating a basic Observable
const simpleObservable = new rxjs.Observable(subscriber => {
console.log('Observable execution started\n');
// Emit values
subscriber.next('First value');
subscriber.next('Second value');
// We can emit values asynchronously too
setTimeout(() => {
subscriber.next('Async value');
subscriber.complete(); // Signal we're done
}, 1000);
// Return cleanup function (runs when unsubscribed)
return () => {
console.log('Cleanup: Observable execution terminated\n');
};
});

// Nothing happens until we subscribe!
console.log('Before subscribe\n');

// Subscribe to start receiving values
const subscription = simpleObservable.subscribe({
next: value => console.log(`Received: ${value}\n`),
error: err => console.log(`Error: ${err}\n`),
complete: () => console.log('Completed!\n')
});

console.log('After subscribe\n');

// Manual unsubscribe would cancel the Observable
// subscription.unsubscribe();
Before subscribe
Observable execution started
Received: First value
Received: Second value
After subscribe
[M]:

Key Observable Concepts:

  1. Lazy Execution: Observables are lazy - they don't emit values until someone subscribes
  2. Push-based: Observables push data to consumers (vs pull-based systems)
  3. Multiple Values: Unlike Promises that resolve once, Observables can emit multiple values
  4. Cancellable: You can stop receiving values by unsubscribing
  5. Producer/Consumer: The Observable is the producer, the subscriber is the consumer
[M]:

2. Creating Observables

RxJS provides many ways to create Observables from different sources.

[4]:
// from - creates Observable from array, promise, iterable
console.log('from array example:\n');
rxjs.from([1, 2, 3]).subscribe(value => console.log(`Array item: ${value}\n`));

console.log('from promise example:\n');
rxjs.from(Promise.resolve('Promise resolved')).subscribe(value => console.log(`Promise result: ${value}\n`));
from array example:
Array item: 1
Array item: 2
Array item: 3
from promise example:
Promise result: Promise resolved
[5]:
// of - creates Observable from individual arguments
console.log('of operator example:\n');
rxjs.of('Apple', 'Banana', 'Cherry').subscribe(fruit => console.log(`Fruit: ${fruit}\n`));
of operator example:
Fruit: Apple
Fruit: Banana
Fruit: Cherry
[6]:
// interval - emits incremental numbers at regular intervals
function intervalExample() {
return new Promise(resolve => {
console.log('interval example:\n');
const counter = rxjs.interval(500).pipe(
operators.take(5) // Take only 5 values
);
const subscription = counter.subscribe({
next: value => console.log(`Interval: ${value}\n`),
complete: () => {
console.log('Interval complete\n');
resolve();
}
});
// If we needed to cancel early:
// setTimeout(() => subscription.unsubscribe(), 1200);
});
}

await intervalExample();
interval example:
Interval: 0
Interval: 1
Interval: 2
Interval: 3
Interval: 4
Interval complete
[7]:
// Create an Observable from an event (simulated in Node.js)
function fromEventExample() {
return new Promise(resolve => {
console.log('fromEvent example (simulated):\n');
// Simulate an event emitter with a Subject
const eventEmitter = new rxjs.Subject();
const clicks$ = eventEmitter.asObservable();
clicks$.pipe(
operators.map(event => ({ x: event.x, y: event.y })),
operators.take(3)
).subscribe({
next: pos => console.log(`Click at: ${JSON.stringify(pos)}\n`),
complete: () => {
console.log('Click stream completed\n');
resolve();
}
});
// Simulate clicks
setTimeout(() => eventEmitter.next({ x: 100, y: 200, type: 'click' }), 300);
setTimeout(() => eventEmitter.next({ x: 150, y: 250, type: 'click' }), 800);
setTimeout(() => eventEmitter.next({ x: 200, y: 300, type: 'click' }), 1200);
});
}

await fromEventExample();
fromEvent example (simulated):
Click at: {"x":100,"y":200}
Click at: {"x":150,"y":250}
Click at: {"x":200,"y":300}
Click stream completed
[M]:

3. Marble Diagrams: Visualizing Reactive Streams

Marble diagrams are a way to visualize Observable sequences and operator behavior. They represent time flowing from left to right, with marbles representing emitted values.

For example, here's what a marble diagram for a simple Observable might look like:

input:  --1--2--3--4--|
               map(x => x * 10)
output: --10-20-30-40--|

Let's see this in action:

[8]:
// Simulating a marble diagram with delayed emissions
function marbleDiagramExample() {
return new Promise(resolve => {
console.log('Marble diagram demonstration:\n');
console.log('input: --1--2--3--4--|\n');
console.log(' map(x => x * 10)\n');
console.log('output: --10-20-30-40--|\n');
// Create a timed sequence to match the diagram
const input$ = rxjs.concat(
rxjs.of(1).pipe(operators.delay(200)),
rxjs.of(2).pipe(operators.delay(200)),
rxjs.of(3).pipe(operators.delay(200)),
rxjs.of(4).pipe(operators.delay(200))
);
const output$ = input$.pipe(
operators.map(x => x * 10)
);
// Log the input and output with timing
const start = Date.now();
input$.subscribe({
next: x => console.log(`[${Date.now() - start}ms] Input: ${x}\n`),
complete: () => console.log(`[${Date.now() - start}ms] Input complete\n`)
});
output$.subscribe({
next: x => console.log(`[${Date.now() - start}ms] Output: ${x}\n`),
complete: () => {
console.log(`[${Date.now() - start}ms] Output complete\n`);
resolve();
}
});
});
Marble diagram demonstration:
input:  --1--2--3--4--|
         map(x => x * 10)
output: --10-20-30-40--|
[202ms] Input: 1
[204ms] Output: 10
[452ms] Input: 2
[452ms] Output: 20
[665ms] Input: 3
[667ms] Output: 30
[871ms] Input: 4
[871ms] Input complete
[871ms] Output: 40
[872ms] Output complete
[M]:

4. Operators: Transforming Streams

Operators are pure functions that transform, filter, or combine observables. They are the primary way to manipulate data streams in reactive programming.

[9]:
// Basic operators: map, filter, reduce
function basicOperatorsExample() {
const numbers$ = rxjs.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
console.log('Basic transformation operators:\n');
// map - transform each value
numbers$.pipe(
operators.map(x => x * 2)
).subscribe(x => console.log(`Doubled: ${x}\n`));
// filter - keep only values that satisfy a condition
numbers$.pipe(
operators.filter(x => x % 2 === 0)
).subscribe(x => console.log(`Even number: ${x}\n`));
// Combining operators
numbers$.pipe(
operators.filter(x => x % 2 === 1), // Keep odd numbers
operators.map(x => x * x) // Square them
).subscribe(x => console.log(`Squared odd: ${x}\n`));
}

basicOperatorsExample();
Basic transformation operators:
Doubled: 2
Doubled: 4
Doubled: 6
Doubled: 8
Doubled: 10
Doubled: 12
Doubled: 14
Doubled: 16
Doubled: 18
Doubled: 20
Even number: 2
Even number: 4
Even number: 6
Even number: 8
Even number: 10
Squared odd: 1
Squared odd: 9
Squared odd: 25
Squared odd: 49
Squared odd: 81
[10]:
// Pipe syntax allows chaining operators
function pipeSyntaxExample() {
const users = [
{ id: 1, name: 'Alice', role: 'admin', active: true },
{ id: 2, name: 'Bob', role: 'user', active: false },
{ id: 3, name: 'Charlie', role: 'user', active: true },
{ id: 4, name: 'Diana', role: 'admin', active: true },
{ id: 5, name: 'Eve', role: 'user', active: false }
];
console.log('Complex data transformation example:\n');
rxjs.from(users).pipe(
operators.filter(user => user.active), // Only active users
operators.map(user => ({ // Extract relevant info
id: user.id,
displayName: user.name,
isAdmin: user.role === 'admin'
})),
operators.toArray() // Collect into array
).subscribe(result => {
console.log('Transformed users:\n');
console.log(JSON.stringify(result, null, 2) + '\n');
});
}

pipeSyntaxExample();
Complex data transformation example:
Transformed users:
[
  {
    "id": 1,
    "displayName": "Alice",
    "isAdmin": true
  },
  {
    "id": 3,
    "displayName": "Charlie",
    "isAdmin": false
  },
  {
    "id": 4,
    "displayName": "Diana",
    "isAdmin": true
  }
]
[M]:

5. Flattening Operators: Handling Nested Observables

Flattening operators are crucial for working with nested Observables, which are common when dealing with HTTP requests, user interactions, and other asynchronous operations.

[11]:
// The problem with nested observables
function nestedObservableProblem() {
return new Promise(resolve => {
console.log('Problem with nested observables:\n');
// Simulate a search input
const searchTerms$ = rxjs.of('rx', 'rxjs');
// Naive approach (problematic)
searchTerms$.subscribe(term => {
console.log(`Searching for: ${term}\n`);
// Simulate an API call
simulateApiCall(term).subscribe(results => {
console.log(`Results for "${term}": ${results}\n`);
});
});
// Helper function to simulate API call
function simulateApiCall(term) {
return rxjs.of(`[Results for ${term}]`).pipe(
operators.delay(500)
);
}
// This approach leads to callback hell and harder error handling
setTimeout(resolve, 1500);
});
}

await nestedObservableProblem();
Problem with nested observables:
Searching for: rx
Searching for: rxjs
Results for "rx": [Results for rx]
Results for "rxjs": [Results for rxjs]
[12]:
// Solution: Using flattening operators
function flatteningOperatorsExample() {
return new Promise(resolve => {
console.log('Flattening operators example:\n');
// Simulate a search input
const searchTerms$ = rxjs.of('react', 'rxjs');
// Helper function - simulated API call
function simulateApiCall(term) {
console.log(`API call for: ${term}\n`);
// Simulate different response times
const delay = term === 'react' ? 800 : 400;
return rxjs.of(`[Results for ${term}]`).pipe(
operators.delay(delay)
);
}
// Using mergeMap - processes all in parallel
console.log('1. mergeMap - parallel processing:\n');
searchTerms$.pipe(
operators.mergeMap(term => {
console.log(`Searching for: ${term}\n`);
return simulateApiCall(term);
})
).subscribe(results => console.log(`mergeMap results: ${results}\n`));
// Using switchMap - cancels previous requests
console.log('\n2. switchMap - cancels previous:\n');
searchTerms$.pipe(
operators.switchMap(term => {
console.log(`Searching for: ${term}\n`);
return simulateApiCall(term);
})
).subscribe(results => console.log(`switchMap results: ${results}\n`));
Flattening operators example:
1. mergeMap - parallel processing:
Searching for: react
API call for: react
Searching for: rxjs
API call for: rxjs

2. switchMap - cancels previous:
Searching for: react
API call for: react
Searching for: rxjs
API call for: rxjs

3. concatMap - sequential processing:
Searching for: react
API call for: react
mergeMap results: [Results for rxjs]
switchMap results: [Results for rxjs]
mergeMap results: [Results for react]
concatMap results: [Results for react]
Searching for: rxjs
API call for: rxjs
concatMap results: [Results for rxjs]
[M]:

6. Hot vs Cold Observables

Understanding the difference between hot and cold Observables is fundamental in reactive programming.

[13]:
// Cold Observable example
function coldObservableExample() {
console.log('Cold Observable example:\n');
// Cold Observable - creates new producer for each subscriber
const cold$ = new rxjs.Observable(subscriber => {
const random = Math.round(Math.random() * 100);
console.log(`Cold Observable producer created with value: ${random}\n`);
subscriber.next(random);
subscriber.complete();
});
// Each subscriber gets a different execution
console.log('First subscriber:\n');
cold$.subscribe(value => console.log(`Subscriber 1 received: ${value}\n`));
console.log('Second subscriber:\n');
cold$.subscribe(value => console.log(`Subscriber 2 received: ${value}\n`));
}

coldObservableExample();
Cold Observable example:
First subscriber:
Cold Observable producer created with value: 44
Subscriber 1 received: 44
Second subscriber:
Cold Observable producer created with value: 36
Subscriber 2 received: 36
[14]:
// Hot Observable example
function hotObservableExample() {
console.log('Hot Observable example:\n');
// Subjects are hot Observables
const hot$ = new rxjs.Subject();
// First subscriber
hot$.subscribe(value => console.log(`Hot subscriber 1 received: ${value}\n`));
// Producer generates value - shared between all subscribers
const random = Math.round(Math.random() * 100);
console.log(`Hot Observable producer generated value: ${random}\n`);
hot$.next(random);
// Second subscriber - joins after first value
hot$.subscribe(value => console.log(`Hot subscriber 2 received: ${value}\n`));
// Another value for both subscribers
const random2 = Math.round(Math.random() * 100);
console.log(`Hot Observable producer generated value: ${random2}\n`);
hot$.next(random2);
// Complete the Subject
hot$.complete();
}

hotObservableExample();
Hot Observable example:
Hot Observable producer generated value: 69
Hot subscriber 1 received: 69
Hot Observable producer generated value: 95
Hot subscriber 1 received: 95
Hot subscriber 2 received: 95
[15]:
// Converting cold to hot with share()
function shareExample() {
return new Promise(resolve => {
console.log('Converting cold to hot with share():\n');
// A cold Observable with side effect
const cold$ = rxjs.interval(500).pipe(
operators.take(3),
operators.tap(x => console.log(`Source emitted: ${x}\n`))
);
// Without sharing - each subscriber gets own execution
console.log('Without sharing (two separate intervals):\n');
const sub1 = cold$.subscribe(x => console.log(`First subscriber: ${x}\n`));
// Second subscriber joins later, gets own producer
setTimeout(() => {
const sub2 = cold$.subscribe(x => console.log(`Second subscriber: ${x}\n`));
// Clean up after 3 seconds
setTimeout(() => {
sub1.unsubscribe();
sub2.unsubscribe();
// Now try with sharing
shareDemoWithDelay();
}, 3000);
}, 1000);
// Helper for shared demo with delay
function shareDemoWithDelay() {
console.log('\nWith sharing (multicasting):\n');
// Make it hot with share()
const hot$ = rxjs.interval(500).pipe(
operators.take(3),
Converting cold to hot with share():
Without sharing (two separate intervals):
Source emitted: 0
First subscriber: 0
Source emitted: 1
First subscriber: 1
Source emitted: 0
Second subscriber: 0
Source emitted: 2
First subscriber: 2
Source emitted: 1
Second subscriber: 1
Source emitted: 2
Second subscriber: 2

With sharing (multicasting):
Shared source emitted: 0
Shared first: 0
Shared source emitted: 1
Shared first: 1
Shared second: 1
Shared source emitted: 2
Shared first: 2
Shared second: 2
[M]:

7. Subjects: Observable and Observer Combined

Subjects are special types of Observables that allow multicasting and act as both Observable and Observer.

[16]:
// Basic Subject example
function basicSubjectExample() {
console.log('Basic Subject example:\n');
const subject = new rxjs.Subject();
// Subscribers
subject.subscribe(value => console.log(`Subscriber A: ${value}\n`));
subject.subscribe(value => console.log(`Subscriber B: ${value}\n`));
// Use next() to push values
subject.next('Hello');
subject.next('World');
// Late subscriber - misses previous values
subject.subscribe(value => console.log(`Late subscriber: ${value}\n`));
subject.next('Still listening?');
// Complete all subscribers
subject.complete();
}

basicSubjectExample();
Basic Subject example:
Subscriber A: Hello
Subscriber B: Hello
Subscriber A: World
Subscriber B: World
Subscriber A: Still listening?
Subscriber B: Still listening?
Late subscriber: Still listening?
[17]:
// Specialized Subjects: BehaviorSubject, ReplaySubject, AsyncSubject
function specializedSubjectsExample() {
console.log('Specialized Subjects:\n');
// 1. BehaviorSubject - has initial value, new subscribers get latest value
console.log('1. BehaviorSubject Example:\n');
const behaviorSubject = new rxjs.BehaviorSubject('Initial state');
behaviorSubject.subscribe(value => console.log(`Behavior subscriber 1: ${value}\n`));
behaviorSubject.next('Updated state');
// New subscriber gets latest value
behaviorSubject.subscribe(value => console.log(`Behavior subscriber 2: ${value}\n`));
// Current value accessible directly
console.log(`Current value: ${behaviorSubject.getValue()}\n`);
// 2. ReplaySubject - buffers specified number of values
console.log('\n2. ReplaySubject Example:\n');
const replaySubject = new rxjs.ReplaySubject(2); // Buffer size of 2
replaySubject.next('First');
replaySubject.next('Second');
replaySubject.next('Third');
// New subscriber gets last 2 values
replaySubject.subscribe(value => console.log(`Replay subscriber: ${value}\n`));
// 3. AsyncSubject - only emits last value upon completion
console.log('\n3. AsyncSubject Example:\n');
const asyncSubject = new rxjs.AsyncSubject();
asyncSubject.subscribe(value => console.log(`Async subscriber 1: ${value}\n`));
asyncSubject.next('Will not see this');
Specialized Subjects:
1. BehaviorSubject Example:
Behavior subscriber 1: Initial state
Behavior subscriber 1: Updated state
Behavior subscriber 2: Updated state
Current value: Updated state

2. ReplaySubject Example:
Replay subscriber: Second
Replay subscriber: Third

3. AsyncSubject Example:
Async subscriber 1: Will see only this
Async subscriber 2: Will see only this
Async late subscriber: Will see only this
[M]:

8. Error Handling in Reactive Programming

Proper error handling is crucial in reactive programming to ensure your application remains resilient.

[18]:
// Error handling strategies
function errorHandlingExample() {
return new Promise(resolve => {
console.log('Error handling strategies:\n');
// Create an Observable that will error
const source$ = rxjs.concat(
rxjs.of(1, 2, 3),
rxjs.throwError(() => new Error('Something went wrong!')),
rxjs.of(4, 5, 6) // Never reaches this point without error handling
);
// 1. Basic error handling in subscription
console.log('1. Without error handling:\n');
source$.subscribe({
next: value => console.log(`Value: ${value}\n`),
error: err => console.log(`Error caught in subscriber: ${err.message}\n`),
complete: () => console.log('Completed! (But won\'t reach here)\n')
});
// 2. Using catchError operator to recover
console.log('\n2. Using catchError to recover:\n');
source$.pipe(
operators.catchError(err => {
console.log(`Error caught by operator: ${err.message}\n`);
return rxjs.of('Recovered value');
})
).subscribe({
next: value => console.log(`Value: ${value}\n`),
complete: () => console.log('Completed successfully with recovery\n')
});
// 3. Using retry operator
setTimeout(() => {
console.log('\n3. Using retry operator:\n');
Error handling strategies:
1. Without error handling:
Value: 1
Value: 2
Value: 3
Error caught in subscriber: Something went wrong!

2. Using catchError to recover:
Value: 1
Value: 2
Value: 3
Error caught by operator: Something went wrong!
Value: Recovered value
Completed successfully with recovery

3. Using retry operator:
Attempt 1
Attempt 2
Attempt 3
Final value: Success after retries!
Completed with retry
[M]:

9. Backpressure: Handling Fast Producers with Slow Consumers

Backpressure occurs when a producer emits values faster than a consumer can process them. RxJS provides operators to handle this.

[19]:
// Handling backpressure with debounce, throttle, and sample
function backpressureExample() {
return new Promise(resolve => {
console.log('Backpressure handling techniques:\n');
// Create a stream that emits quickly (simulating rapid events)
function createFastStream() {
return rxjs.interval(100).pipe(
operators.take(20),
operators.map(i => `Event ${i}`)
);
}
// 1. throttleTime - emit first value, then ignore for specified time
console.log('1. Using throttleTime (first-pass):\n');
const throttled$ = createFastStream().pipe(
operators.tap(val => console.log(`Original: ${val}\n`)),
operators.throttleTime(300),
operators.tap(val => console.log(`Throttled: ${val}\n`))
);
const throttleSub = throttled$.subscribe({
complete: () => {
console.log('Throttle example complete\n');
secondExample();
}
});
// Cancel after a few emissions to keep example brief
setTimeout(() => {
throttleSub.unsubscribe();
console.log('Throttle example stopped\n');
secondExample();
}, 1200);
function secondExample() {
Backpressure handling techniques:
1. Using throttleTime (first-pass):
Original: Event 0
Throttled: Event 0
Original: Event 1
Original: Event 2
Original: Event 3
Throttled: Event 3
Original: Event 4
Original: Event 5
Original: Event 6
Throttled: Event 6
Original: Event 7
Original: Event 8
Original: Event 9
Throttled: Event 9
Original: Event 10
Throttle example stopped

2. Using debounceTime (wait for pause):
Typing: a
Typing: ap
Typing: app
Debounced result: app
Typing: apple
Typing: apple p
Typing: apple pi
Typing: apple pie
Debounced result: apple pie
Debounce example complete

3. Using sample (periodic sampling):
Fast source: 0
Fast source: 1
Sampled value: 1
Fast source: 2
Fast source: 3
Fast source: 4
Sampled value: 4
Fast source: 5
Fast source: 6
Fast source: 7
Sampled value: 7
Fast source: 8
Fast source: 9
Fast source: 10
Sampled value: 10
Fast source: 11
Fast source: 12
Fast source: 13
Sampled value: 13
Fast source: 14
Sample example complete
[M]:

10. Composing Streams: Building Complex Logic from Simple Parts

The true power of reactive programming comes from composing streams. Let's explore how to build complex data flows by combining simple streams.

[20]:
// Building complex streams through composition
function compositionExample() {
return new Promise(resolve => {
console.log('Stream composition example:\n');
// 1. User input stream (simulated)
const searchTerms$ = rxjs.from(['re', 'rea', 'reac', 'react', 'reacti', 'reactiv']).pipe(
operators.concatMap(term => rxjs.of(term).pipe(operators.delay(300))),
operators.tap(term => console.log(`User typed: ${term}\n`))
);
// 2. API call function (simulated)
function searchApi(term) {
console.log(`Making API call for: ${term}\n`);
return rxjs.of({
results: [
`${term} result 1`,
`${term} result 2`
],
term,
timestamp: new Date().toISOString()
}).pipe(
operators.delay(term.length * 100) // Longer terms take longer
);
}
// 3. Compose the streams to create a search feature
const search$ = searchTerms$.pipe(
// Only search when term is long enough
operators.filter(term => term.length > 2),
// Wait for user to pause typing
operators.debounceTime(200),
// Don't search for the same term twice
operators.distinctUntilChanged(),
Stream composition example:
User typed: re
User typed: rea
Making API call for: rea
User typed: reac
Making API call for: reac
User typed: react
Making API call for: react
User typed: reacti
Making API call for: reacti
User typed: reactiv
Making API call for: reactiv
Results for "reactiv": ["reactiv result 1","reactiv result 2"]
Search history size: 0
Search stream completed
[M]:

11. Reactive Patterns in Real-World Applications

Let's look at some common reactive patterns used in real-world applications.

[21]:
// State management with BehaviorSubject
function stateManagementExample() {
console.log('State management pattern:\n');
// Create a simple store with BehaviorSubject
class Store {
constructor(initialState) {
this.state$ = new rxjs.BehaviorSubject(initialState);
}
get state() {
return this.state$.getValue();
}
updateState(partialState) {
this.state$.next({ ...this.state, ...partialState });
}
select(selector) {
return this.state$.pipe(
operators.map(selector),
operators.distinctUntilChanged()
);
}
}
// Create a todo app store
const todoStore = new Store({
todos: [],
filter: 'all',
loading: false,
error: null
});
// UI subscribes to specific parts of state
const todoList$ = todoStore.select(state => state.todos);
State management pattern:
Todo list updated: []
Filter changed to: all
Adding todos...
Todo list updated: [{"id":1,"text":"Learn RxJS","completed":false},{"id":2,"text":"Build reactive app","completed":false}]
Completing todo...
Todo list updated: [{"id":1,"text":"Learn RxJS","completed":true},{"id":2,"text":"Build reactive app","completed":false}]
Changing filter...
Filter changed to: completed
[22]:
// Caching with shareReplay
function cachingExample() {
return new Promise(resolve => {
console.log('Caching pattern with shareReplay:\n');
// Simulate expensive data fetch
function fetchData() {
console.log('Making expensive API call...\n');
return rxjs.of({ data: 'Valuable data', timestamp: Date.now() })
.pipe(operators.delay(1000));
}
// Cache the result with shareReplay
const cachedData$ = fetchData().pipe(
operators.tap(data => console.log(`Data fetched: ${JSON.stringify(data)}\n`)),
operators.shareReplay(1) // Cache 1 value for new subscribers
);
// First subscriber triggers the API call
console.log('First subscriber subscribes\n');
cachedData$.subscribe(data => console.log(`First subscriber got data (time=${Date.now()})\n`));
// Second subscriber gets cached value
setTimeout(() => {
console.log('Second subscriber subscribes (should use cache)\n');
cachedData$.subscribe(data => console.log(`Second subscriber got data (time=${Date.now()})\n`));
}, 1500);
// Third subscriber also gets cached value
setTimeout(() => {
console.log('Third subscriber subscribes (should use cache)\n');
cachedData$.subscribe({
next: data => console.log(`Third subscriber got data (time=${Date.now()})\n`),
complete: () => {
console.log('Caching example complete\n');
resolve();
Caching pattern with shareReplay:
Making expensive API call...
First subscriber subscribes
Data fetched: {"data":"Valuable data","timestamp":1741960097244}
First subscriber got data (time=1741960098252)
Second subscriber subscribes (should use cache)
Second subscriber got data (time=1741960098753)
Third subscriber subscribes (should use cache)
Third subscriber got data (time=1741960099250)
Caching example complete
[M]:

Reactive Programming Principles

Let's finish by summarizing the core principles of reactive programming:

  1. Declarative: Describe what should happen, not how it happens
  2. Data-driven: Data changes drive application flow
  3. Composable: Complex behaviors built from simple operators
  4. Resilient: Better error recovery with isolated error handling
  5. Responsive: Backpressure handling keeps UI responsive
  6. Lazy: Processing only occurs when needed (subscription-based)
  7. Asynchronous: Naturally handles sync and async operations in the same way

Reactive programming with RxJS gives you powerful tools to handle complex async operations, UI events, and data management in a consistent, maintainable way. While it has a learning curve, mastering these core concepts will significantly improve how you handle asynchronous programming in JavaScript applications

Sign in to save your work and access it from anywhere