Logo
⚠️ Unsaved
[M]:

RxJS Basics: Getting Started with Observables

Hey there! If you've been working with JavaScript and heard about reactive programming, you've probably come across RxJS. It's a powerful library for handling asynchronous operations and event-based programs using observables.

In this notebook, we'll explore the basics of RxJS, focusing on observables - the core building blocks of reactive programming. We'll cover creating observables, subscribing to them, using operators, and handling common patterns. By the end, you'll have a solid foundation to start using RxJS in your projects.

[1]:
// Install RxJS
!npm install rxjs
$ npm install rxjs

added 2 packages in 2s

28 packages are looking for funding
  run `npm fund` for details
[2]:
// Import core RxJS functionality
const rxjs = require('rxjs');
const operators = require('rxjs/operators');

console.log("RxJS loaded successfully!\n");
RxJS loaded successfully!
[M]:

What is an Observable?

An Observable is a core concept in RxJS. Think of it as a stream of data that can emit multiple values over time. Unlike Promises that resolve once, Observables can emit multiple values and are lazy - they don't start producing values until you subscribe to them.

[3]:
// Creating a simple Observable
const simpleObservable = rxjs.Observable.create(subscriber => {
console.log('Observable executed!\n');
subscriber.next('First value');
subscriber.next('Second value');
// We can emit values asynchronously too
setTimeout(() => {
subscriber.next('Async value after 1 second');
subscriber.complete(); // Signal that we're done emitting values
}, 1000);
});

// Nothing happens until we subscribe
console.log("Before subscribing - nothing happens yet\n");
Before subscribing - nothing happens yet
[4]:
// Subscribe to start receiving values
console.log("Subscribing to the observable:\n");
const subscription = simpleObservable.subscribe({
next: value => console.log(`Received: ${value}\n`),
error: err => console.error(`Error: ${err}\n`),
complete: () => console.log("Observable completed!\n")
});

// We can unsubscribe if needed
setTimeout(() => {
console.log("Unsubscribing (but our observable already completed)\n");
subscription.unsubscribe();
}, 2000);
Subscribing to the observable:
Observable executed!
Received: First value
Received: Second value
[M]:

Creating Observables

RxJS provides several ways to create observables from different data sources.

[5]:
// of() - Create an observable from individual values
const ofObservable = rxjs.of(1, 2, 3, 4, 5);

console.log("Observable from 'of':\n");
ofObservable.subscribe({
next: value => console.log(`of emitted: ${value}\n`),
complete: () => console.log("of completed\n")
});
Observable from 'of':
of emitted: 1
of emitted: 2
of emitted: 3
of emitted: 4
of emitted: 5
of completed
[6]:
// from() - Create an observable from an array, promise, or iterable
const arrayObservable = rxjs.from([10, 20, 30, 40, 50]);

console.log("Observable from array:\n");
arrayObservable.subscribe({
next: value => console.log(`Array emitted: ${value}\n`),
complete: () => console.log("Array observable completed\n")
});
Observable from array:
Array emitted: 10
Array emitted: 20
Array emitted: 30
Array emitted: 40
Array emitted: 50
Array observable completed
[7]:
// from() with a Promise
const promiseObservable = rxjs.from(Promise.resolve('Resolved promise value'));

console.log("Observable from promise:\n");
promiseObservable.subscribe({
next: value => console.log(`Promise emitted: ${value}\n`),
complete: () => console.log("Promise observable completed\n")
});
Observable from promise:
Promise emitted: Resolved promise value
Promise observable completed
[8]:
// interval() - Emit sequential numbers at specified intervals
const intervalObservable = rxjs.interval(500); // Emit every 500ms

console.log("Observable from interval (taking only 5 values):\n");
const intervalSubscription = intervalObservable
.pipe(operators.take(5)) // Take only 5 values then complete
.subscribe({
next: value => console.log(`Interval emitted: ${value}\n`),
complete: () => console.log("Interval observable completed\n")
});
Observable from interval (taking only 5 values):
[M]:

Using Operators

Operators are functions that let you transform, filter, and combine observables. They're one of the most powerful features of RxJS.

[9]:
// map operator - transform each value
const numbers = rxjs.of(1, 2, 3, 4, 5);

console.log("Using map operator to double each value:\n");
numbers.pipe(
operators.map(x => x * 2)
).subscribe(value => console.log(`Doubled: ${value}\n`));
Using map operator to double each value:
Doubled: 2
Doubled: 4
Doubled: 6
Doubled: 8
Doubled: 10
[10]:
// filter operator - only emit values that pass a condition
console.log("Using filter operator to get only even numbers:\n");
numbers.pipe(
operators.filter(x => x % 2 === 0)
).subscribe(value => console.log(`Even number: ${value}\n`));
Using filter operator to get only even numbers:
Even number: 2
Even number: 4
[11]:
// Chaining multiple operators
console.log("Chaining operators - filter even numbers then multiply by 10:\n");
numbers.pipe(
operators.filter(x => x % 2 === 0),
operators.map(x => x * 10),
operators.tap(x => console.log(`Processing: ${x}\n`)) // tap for side effects
).subscribe(value => console.log(`Final value: ${value}\n`));
Chaining operators - filter even numbers then multiply by 10:
Processing: 20
Final value: 20
Processing: 40
Final value: 40
[M]:

Error Handling

Observables have built-in error handling capabilities.

[12]:
// Observable that will emit an error
const errorObservable = rxjs.Observable.create(subscriber => {
subscriber.next('Value before error');
subscriber.error(new Error('Something went wrong!'));
// This won't be emitted because of the error
subscriber.next('Value after error');
});

console.log("Observable with error:\n");
errorObservable.subscribe({
next: value => console.log(`Received: ${value}\n`),
error: err => console.error(`Error caught in subscriber: ${err.message}\n`),
complete: () => console.log("This won't run because of the error\n")
});
Observable with error:
Received: Value before error
[13]:
// Using catchError operator
console.log("Using catchError to handle errors gracefully:\n");
errorObservable.pipe(
operators.catchError(error => {
console.log(`Error caught by operator: ${error.message}\n`);
// Return a new observable as a fallback
return rxjs.of('Fallback value 1', 'Fallback value 2');
})
).subscribe({
next: value => console.log(`Received: ${value}\n`),
complete: () => console.log("Observable completed successfully\n")
});
Using catchError to handle errors gracefully:
Received: Value before error
Error caught by operator: Something went wrong!
Received: Fallback value 1
Received: Fallback value 2
Observable completed successfully
[M]:

Subjects

Subjects are special types of Observables that allow values to be multicasted to many Observers. They act as both an Observable and an Observer.

[14]:
// Create a Subject
const subject = new rxjs.Subject();

// Add two subscribers
console.log("Creating two subscribers to a Subject:\n");

subject.subscribe({
next: value => console.log(`Subscriber 1 received: ${value}\n`),
complete: () => console.log("Subscriber 1 completed\n")
});

subject.subscribe({
next: value => console.log(`Subscriber 2 received: ${value}\n`),
complete: () => console.log("Subscriber 2 completed\n")
});
Creating two subscribers to a Subject:
[15]:
// Now emit values to all subscribers
console.log("Emitting values to all subscribers:\n");
subject.next('First value from subject');
subject.next('Second value from subject');
subject.complete();
Emitting values to all subscribers:
Subscriber 1 received: First value from subject
Subscriber 2 received: First value from subject
Subscriber 1 received: Second value from subject
Subscriber 2 received: Second value from subject
Subscriber 1 completed
Subscriber 2 completed
[M]:

Practical Example: Data Fetching

Let's see a practical example of using RxJS for data fetching with error handling.

[16]:
// Simulate a fetch function that might fail
function fetchData(id) {
return rxjs.Observable.create(subscriber => {
console.log(`Fetching data for id: ${id}\n`);
// Simulate network request
setTimeout(() => {
// Randomly succeed or fail
if (Math.random() > 0.3) {
subscriber.next({ id, name: `Item ${id}`, value: Math.floor(Math.random() * 100) });
subscriber.complete();
} else {
subscriber.error(new Error(`Failed to fetch data for id: ${id}`));
}
}, 500);
// Cleanup function
return () => console.log(`Cancelled fetch for id: ${id}\n`);
});
}
[17]:
// Fetch data for multiple IDs
const ids = [1, 2, 3, 4, 5];

console.log("Fetching data for multiple IDs with error handling:\n");
rxjs.from(ids).pipe(
operators.mergeMap(id => fetchData(id).pipe(
// Handle errors for individual fetches
operators.catchError(error => {
console.error(`${error.message} - returning fallback\n`);
return rxjs.of({ id, name: `Fallback for ${id}`, value: 0 });
})
)),
// Process the results
operators.map(item => ({ ...item, processed: true, value: item.value * 2 }))
).subscribe({
next: result => console.log(`Processed result: ${JSON.stringify(result)}\n`),
error: err => console.error(`Unexpected error: ${err.message}\n`),
complete: () => console.log("All data fetching completed!\n")
});
Fetching data for multiple IDs with error handling:
Fetching data for id: 1
Fetching data for id: 2
Fetching data for id: 3
Fetching data for id: 4
Fetching data for id: 5
[M]:

Key Takeaways

Here's what we've learned about RxJS and Observables:

  1. Observables are lazy collections of multiple values over time
  2. Subscription is needed to start receiving values from an Observable
  3. Operators allow transforming, filtering, and combining Observables
  4. Error handling is built into the Observable pattern
  5. Subjects allow multicasting values to multiple subscribers.

RxJS is incredibly powerful for handling asynchronous operations, events, and data streams. While there's a learning curve, mastering Observables can greatly simplify complex async code.

Sign in to save your work and access it from anywhere