Logo
⚠️ Unsaved
[M]:

Testing and Debugging RxJS Streams

Reactive programming with RxJS can be powerful, but debugging asynchronous data streams can get tricky. This notebook provides practical techniques for testing and debugging RxJS streams to help you troubleshoot issues and write reliable tests.

[1]:
// Install RxJS
!npm install rxjs
$ npm install rxjs

added 2 packages in 2s

28 packages are looking for funding
  run `npm fund` for details
[2]:
// Import RxJS
const rxjs = require('rxjs');
const operators = require('rxjs/operators');

console.log('RxJS loaded successfully\n');
RxJS loaded successfully
[M]:

1. Basic Debugging Techniques

Using tap() for Logging

The tap() operator is your best friend for debugging. It lets you peek into the stream without changing it.

[3]:
// Using tap() to debug a simple stream
const basicStream$ = rxjs.of(1, 2, 3, 4, 5);

console.log('Debugging with tap():\n');
basicStream$.pipe(
operators.tap(val => console.log(`Original value: ${val}\n`)),
operators.map(val => val * 10),
operators.tap(val => console.log(`After map: ${val}\n`)),
operators.filter(val => val > 20),
operators.tap(val => console.log(`After filter: ${val}\n`))
).subscribe({
next: val => console.log(`Final value: ${val}\n`),
complete: () => console.log('Stream completed\n')
});
Debugging with tap():
Original value: 1
After map: 10
Original value: 2
After map: 20
Original value: 3
After map: 30
After filter: 30
Final value: 30
Original value: 4
After map: 40
After filter: 40
Final value: 40
Original value: 5
After map: 50
After filter: 50
Final value: 50
Stream completed
[M]:

Creating a Custom Debug Operator

Let's create a more powerful debug operator to consistently log stream values with context.

[4]:
// Custom debug operator for better logs
function debug(tag, logValues = true) {
return source$ => source$.pipe(
operators.tap({
next: value => {
if (logValues) {
console.log(`[${tag}] Next: ${JSON.stringify(value)}\n`);
} else {
console.log(`[${tag}] Next value received\n`);
}
},
error: err => console.log(`[${tag}] Error: ${err}\n`),
complete: () => console.log(`[${tag}] Complete\n`)
})
);
}

// Using our custom debug operator
const complexObject$ = rxjs.of(
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' },
{ id: 3, name: 'Charlie' }
);

console.log('Using custom debug operator:\n');
complexObject$.pipe(
debug('Source'),
operators.map(user => ({ ...user, name: user.name.toUpperCase() })),
debug('After uppercase'),
operators.filter(user => user.id > 1),
debug('After filter', true)
).subscribe();
Using custom debug operator:
[Source] Next: {"id":1,"name":"Alice"}
[After uppercase] Next: {"id":1,"name":"ALICE"}
[Source] Next: {"id":2,"name":"Bob"}
[After uppercase] Next: {"id":2,"name":"BOB"}
[After filter] Next: {"id":2,"name":"BOB"}
[Source] Next: {"id":3,"name":"Charlie"}
[After uppercase] Next: {"id":3,"name":"CHARLIE"}
[After filter] Next: {"id":3,"name":"CHARLIE"}
[Source] Complete
[After uppercase] Complete
[After filter] Complete
[M]:

2. Tracking Subscription Lifecycle

Tracking when subscriptions are created and disposed is critical for debugging memory leaks.

[5]:
// Tracking subscription lifecycle
function trackSubscription(name) {
return new Promise(resolve => {
console.log(`Tracking subscription: ${name}\n`);
const timer$ = rxjs.timer(0, 1000).pipe(
operators.take(3),
operators.finalize(() => {
console.log(`Stream ${name} finalized\n`);
resolve();
})
);
console.log(`Subscribing to ${name}...\n`);
const subscription = timer$.subscribe(val => {
console.log(`${name} emitted: ${val}\n`);
});
});
}

await trackSubscription('Timer stream');
Tracking subscription: Timer stream
Subscribing to Timer stream...
Timer stream emitted: 0
Timer stream emitted: 1
Timer stream emitted: 2
Stream Timer stream finalized
[M]:

3. Debugging Errors in RxJS Streams

Error handling is a critical part of working with RxJS.

[6]:
// Common error handling and debugging patterns
function errorHandlingDemo() {
return new Promise(resolve => {
console.log('Error handling demo:\n');
// Create a stream that will error
const source$ = rxjs.concat(
rxjs.of(1, 2, 3),
rxjs.throwError(() => new Error('Simulated error!')),
rxjs.of(4, 5, 6) // These values never emit without error handling
);
// 1. Basic error handler in subscribe
console.log('1. Without error handling:\n');
source$.pipe(
debug('No error handling')
).subscribe({
next: val => console.log(`Received: ${val}\n`),
error: err => console.log(`Error caught in subscriber: ${err.message}\n`),
complete: () => console.log('Completed (won\'t reach here)\n')
});
// 2. Using catchError to recover from errors
setTimeout(() => {
console.log('\n2. With catchError for recovery:\n');
source$.pipe(
debug('Before catchError'),
operators.catchError(err => {
console.log(`Error caught by operator: ${err.message}\n`);
return rxjs.of('Fallback value');
}),
debug('After catchError')
).subscribe({
complete: () => {
console.log('Stream completed with recovery\n');
resolve();
Error handling demo:
1. Without error handling:
[No error handling] Next: 1
Received: 1
[No error handling] Next: 2
Received: 2
[No error handling] Next: 3
Received: 3
[No error handling] Error: Error: Simulated error!
Error caught in subscriber: Simulated error!

2. With catchError for recovery:
[Before catchError] Next: 1
[After catchError] Next: 1
[Before catchError] Next: 2
[After catchError] Next: 2
[Before catchError] Next: 3
[After catchError] Next: 3
[Before catchError] Error: Error: Simulated error!
Error caught by operator: Simulated error!
[After catchError] Next: "Fallback value"
[After catchError] Complete
Stream completed with recovery
[M]:

4. Marble Testing

Marble testing is a powerful technique for testing RxJS streams. It uses ASCII diagrams to represent stream events over time.

[7]:
// Import the TestScheduler
const { TestScheduler } = require('rxjs/testing');

// Basic marble test example
console.log('Basic marble testing example:\n');

// Create test scheduler
const testScheduler = new TestScheduler((actual, expected) => {
const actualValues = actual.map(x => ({ frame: x.frame, notification: { kind: x.notification.kind, value: x.notification.value }}));
const expectedValues = expected.map(x => ({ frame: x.frame, notification: { kind: x.notification.kind, value: x.notification.value }}));
console.log('Actual:\n', JSON.stringify(actualValues, null, 2), '\n');
console.log('Expected:\n', JSON.stringify(expectedValues, null, 2), '\n');
const equal = JSON.stringify(actualValues) === JSON.stringify(expectedValues);
console.log(`Test passed: ${equal}\n`);
});

// Run tests with the scheduler
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('--a--b--|', { a: 1, b: 2 });
const expected = '--x--y--|';
const values = { x: 10, y: 20 };
const result$ = source$.pipe(
operators.map(x => x * 10)
);
expectObservable(result$).toBe(expected, values);
});
Basic marble testing example:
Actual:
 [
  {
    "frame": 2,
    "notification": {
      "kind": "N",
      "value": 10
    }
  },
  {
    "frame": 5,
    "notification": {
      "kind": "N",
      "value": 20
    }
  },
  {
    "frame": 8,
    "notification": {
      "kind": "C"
    }
  }
] 
Expected:
 [
  {
    "frame": 2,
    "notification": {
      "kind": "N",
      "value": 10
    }
  },
  {
    "frame": 5,
    "notification": {
      "kind": "N",
      "value": 20
    }
  },
  {
    "frame": 8,
    "notification": {
      "kind": "C"
    }
  }
] 
Test passed: true
[8]:
// Testing async operators with marble diagrams
console.log('Testing async operators with marble diagrams:\n');

testScheduler.run(({ cold, expectObservable }) => {
// Setup test stream with values at specific virtual times
const input = cold( '-a-b-c----|', { a: 1, b: 2, c: 3 });
const expected = '---b-c----|';
// Apply debounceTime operator (normally async, but synchrnonous with TestScheduler)
const result = input.pipe(
operators.debounceTime(2) // 2 "frames" in virtual time
);
// Assert the results
expectObservable(result).toBe(expected, { b: 2, c: 3 });
});
Testing async operators with marble diagrams:
Actual:
 [
  {
    "frame": 15,
    "notification": {
      "kind": "N",
      "value": 3
    }
  },
  {
    "frame": 18,
    "notification": {
      "kind": "C"
    }
  }
] 
Expected:
 [
  {
    "frame": 3,
    "notification": {
      "kind": "N",
      "value": 2
    }
  },
  {
    "frame": 5,
    "notification": {
      "kind": "N",
      "value": 3
    }
  },
  {
    "frame": 10,
    "notification": {
      "kind": "C"
    }
  }
] 
Test passed: false
[M]:

5. Memory Leak Detection

Memory leaks are a common issue with RxJS. Let's look at how to detect and fix them.

[9]:
// Detecting unsubscribed Observables
function memoryLeakDemo() {
return new Promise(resolve => {
console.log('Memory leak detection:\n');
// Bad pattern: creating a subscription without cleanup
const leakyFunction = () => {
console.log('Creating a potential memory leak...\n');
const timer$ = rxjs.interval(1000);
// Problem: this subscription is never unsubscribed!
const subscription = timer$.subscribe(val => {
console.log(`Leak emitted: ${val}\n`);
});
// We should return the subscription for the caller to manage
return subscription;
};
// Better pattern: using takeUntil
const betterFunction = () => {
console.log('\nCreating a properly managed stream...\n');
const destroy$ = new rxjs.Subject();
rxjs.interval(1000).pipe(
operators.takeUntil(destroy$),
operators.finalize(() => console.log('Stream properly cleaned up\n'))
).subscribe(val => {
console.log(`Safe stream emitted: ${val}\n`);
});
// Return a function to clean up
return () => {
destroy$.next();
destroy$.complete();
Memory leak detection:
Creating a potential memory leak...

Creating a properly managed stream...
Leak emitted: 0
Safe stream emitted: 0
Leak emitted: 1
Safe stream emitted: 1

Cleaning up both streams...
Stream properly cleaned up
[M]:

6. Best Practices for Testing and Debugging

  1. Add debug operators strategically:

    • Place tap() calls before and after complex operators
    • Use custom debug operators for consistent logging
    • Remove or disable debug code in production
  2. Always handle errors:

    • Use catchError for recovery
    • Consider retry strategies for transient failures
    • Log errors with helpful context
  3. Prevent memory leaks:

    • Always unsubscribe from long-lived Observables
    • Use the takeUntil(destroy$) pattern in components
    • Use finalize() to verify cleanup
  4. Write testable streams:

    • Extract complex stream logic into functions
    • Use dependency injection for external services
    • Design with testing in mind
  5. Use TestScheduler for time-based testing:

    • TestScheduler uses virtual time, making tests fast and deterministic
    • Define input streams with marble diagrams like '--a--b--|'
    • Assert expected output with expectObservable(result$).toBe('--x--y--|')
  6. Avoid nested subscriptions:

    • Nested subscriptions are hard to track and often lead to memory leaks
    • Use flattening operators like switchMap, mergeMap instead
  7. Test edge cases:

    • Empty streams
    • Error cases
    • Race conditions with multiple sources
Sign in to save your work and access it from anywhere