Logo
⚠️ Unsaved
[M]:

Building Custom Operators in RxJS

RxJS provides dozens of built-in operators, but sometimes you need specialized functionality for your specific use cases. Creating custom operators allows you to encapsulate complex logic, improve code reusability, and make your reactive streams more readable.

In this notebook, we'll explore different ways to build custom operators in RxJS, from simple composition to creating operators from scratch.

[1]:
// Install RxJS
!npm install rxjs
$ npm install rxjs

added 2 packages in 2s

28 packages are looking for funding
  run `npm fund` for details
[2]:
// Import RxJS
const rxjs = require('rxjs');
const operators = require('rxjs/operators');

console.log('RxJS loaded successfully\n');
RxJS loaded successfully
[M]:

1. Why Create Custom Operators?

Before diving into the how, let's understand why you might want to create custom operators:

  • Reusability: Package common sequences of operators for reuse across your application
  • Readability: Abstract complex transformations into meaningful, domain-specific names
  • Testability: Test complex stream logic in isolation
  • Encapsulation: Hide implementation details behind a clean interface
  • Domain-specific operations: Create operators that reflect your business domain
[M]:

2. Creating Custom Operators Through Composition

The simplest way to create a custom operator is by composing existing operators. This approach requires no special knowledge beyond basic pipe usage.

[3]:
// Simple custom operator using composition
// This operator multiplies each value by 10 and then rounds the result
function multiplyAndRound(multiplier = 10) {
return source$ => source$.pipe(
operators.map(value => value * multiplier),
operators.map(value => Math.round(value))
);
}

// Let's use our custom operator
const numbers$ = rxjs.of(1.2, 2.5, 3.7, 4.9);

console.log('Using our multiplyAndRound custom operator:\n');
numbers$.pipe(
multiplyAndRound()
).subscribe(value => console.log(`Result: ${value}\n`));
Using our multiplyAndRound custom operator:
Result: 12
Result: 25
Result: 37
Result: 49
[M]:

The pattern is straightforward:

  1. Create a function that returns a function (which can take configuration parameters)
  2. That returned function should accept a source Observable and return a new Observable
  3. Use pipe() to apply existing operators inside your custom operator

Let's look at a more practical example:

[4]:
// A more practical example: error retry with exponential backoff
function retryWithExponentialBackoff({
maxRetries = 3,
initialInterval = 1000,
maxInterval = 30000
} = {}) {
return source$ => source$.pipe(
operators.retryWhen(errors => errors.pipe(
operators.scan((attemptCount, error) => {
if (attemptCount >= maxRetries) {
throw error; // If we've reached max retries, rethrow the error
}
return attemptCount + 1;
}, 0),
operators.tap(attemptCount => {
const delay = Math.min(
initialInterval * Math.pow(2, attemptCount - 1),
maxInterval
);
console.log(`Retry attempt ${attemptCount} after ${delay}ms\n`);
}),
operators.delayWhen(attemptCount => {
const delay = Math.min(
initialInterval * Math.pow(2, attemptCount - 1),
maxInterval
);
return rxjs.timer(delay);
})
))
);
}

// For demonstration, we'll use shorter delays and fewer retries
function demonstrateBackoffRetry() {
return new Promise(resolve => {
// Simulate a flaky API that fails several times before succeeding
Testing retryWithExponentialBackoff:
API call attempt 1
Retry attempt 1 after 100ms
API call attempt 2
Retry attempt 2 after 200ms
API call attempt 3
API call successful!
Operation completed successfully
[M]:

This exponential backoff retry operator encapsulates a fairly complex error handling strategy, but now we can reuse it easily across our application with just one line of code.

[M]:

3. Custom Operators for Debugging

Let's create a simple but useful debugging operator that helps trace the flow of events through your RxJS streams.

[5]:
// A debug operator to help trace emissions, errors, and completion
function debug(tag, options = {}) {
const {
showValue = true,
showTime = true,
showType = true
} = options;
return source$ => source$.pipe(
operators.tap({
next: value => {
const timestamp = showTime ? `[${new Date().toISOString()}]` : '';
const type = showType ? `(${typeof value})` : '';
const display = showValue ?
(typeof value === 'object' ? JSON.stringify(value) : value) :
'';
console.log(`${timestamp} [${tag}] NEXT ${type}: ${display}\n`);
},
error: error => {
console.log(`[${tag}] ERROR: ${error?.message || error}\n`);
},
complete: () => {
console.log(`[${tag}] COMPLETE\n`);
}
})
);
}

// Let's use our debug operator to track a stream
const source$ = rxjs.of(1, 2, 3, { name: 'John' }).pipe(
debug('SOURCE'),
operators.map(x => typeof x === 'number' ? x * 10 : x),
debug('AFTER MAP'),
operators.filter(x => typeof x === 'number'),
debug('AFTER FILTER')
Testing our debug operator:
[2025-03-14T13:53:41.366Z] [SOURCE] NEXT (number): 1
[2025-03-14T13:53:41.366Z] [AFTER MAP] NEXT (number): 10
[2025-03-14T13:53:41.366Z] [AFTER FILTER] NEXT (number): 10
[2025-03-14T13:53:41.366Z] [SOURCE] NEXT (number): 2
[2025-03-14T13:53:41.366Z] [AFTER MAP] NEXT (number): 20
[2025-03-14T13:53:41.366Z] [AFTER FILTER] NEXT (number): 20
[2025-03-14T13:53:41.366Z] [SOURCE] NEXT (number): 3
[2025-03-14T13:53:41.366Z] [AFTER MAP] NEXT (number): 30
[2025-03-14T13:53:41.366Z] [AFTER FILTER] NEXT (number): 30
[2025-03-14T13:53:41.366Z] [SOURCE] NEXT (object): {"name":"John"}
[2025-03-14T13:53:41.366Z] [AFTER MAP] NEXT (object): {"name":"John"}
[SOURCE] COMPLETE
[AFTER MAP] COMPLETE
[AFTER FILTER] COMPLETE
[M]:

This debug operator is immensely helpful during development to understand how data flows through your streams and identify where issues might be occurring.

[M]:

4. Creating Domain-Specific Operators

Let's create some operators that are specific to a common domain problem: handling API responses.

[6]:
// Custom operators for handling API responses

// Extract data from a standard API response
function extractData() {
return source$ => source$.pipe(
operators.map(response => {
if (response.status === 'error') {
throw new Error(response.message || 'API Error');
}
return response.data;
})
);
}

// Handle API loading state with timeout notification
function withLoading(timeoutMs = 10000) {
return source$ => {
// Create an Observable that emits loading states
const loading$ = rxjs.concat(
// Emit 'loading' immediately
rxjs.of({ loading: true, data: null, error: null }),
// Use the source Observable for data
source$.pipe(
operators.map(data => ({ loading: false, data, error: null })),
// Handle errors from the source
operators.catchError(error => rxjs.of({
loading: false,
data: null,
error: error?.message || 'Unknown error'
}))
),
// Handle the case where neither error nor success happens (timeout)
rxjs.timer(timeoutMs).pipe(
operators.takeUntil(source$),
Testing API custom operators:
Successful API call:
State: {"loading":true,"data":null,"error":null}
State: {"loading":false,"data":{"id":123,"name":"Example data"},"error":null}
Success case completed

Failed API call:
State: {"loading":true,"data":null,"error":null}
State: {"loading":false,"data":null,"error":"Failed to fetch data"}
Error case completed
[M]:

These domain-specific operators help manage API interaction patterns that are common in web applications. By encapsulating the loading state and error handling logic into reusable operators, we make our application code more declarative and consistent.

[M]:

5. Creating Operators from Scratch

Sometimes you need complete control over how your custom operator works. In this case, you'll need to create an operator from scratch by working directly with the Observable.

[7]:
// Create an operator from scratch using direct Observable manipulation
function customBufferToggle(openings, closingSelector) {
return source$ => new rxjs.Observable(observer => {
let buffer = [];
let buffering = false;
// Subscribe to the source
const sourceSubscription = source$.subscribe({
next: value => {
if (buffering) {
buffer.push(value);
}
},
error: err => observer.error(err),
complete: () => {
if (buffer.length > 0 && buffering) {
observer.next(buffer);
buffer = [];
}
observer.complete();
}
});
// Subscribe to the openings Observable
const openingsSubscription = openings.subscribe({
next: opening => {
// Start a new buffer
if (buffering) {
// Emit the previous buffer if we were already buffering
observer.next(buffer);
buffer = [];
} else {
buffering = true;
buffer = [];
}
Testing custom buffer toggle operator:
Source emitted: 0
Source emitted: 1
Source emitted: 2
Source emitted: 3
Source emitted: 4
Source emitted: 5
Source emitted: 6
Buffer emitted: [4, 5, 6]
Source emitted: 7
Source emitted: 8
Source emitted: 9
Source emitted: 10
Source emitted: 11
Buffer emitted: [9, 10, 11]
Source emitted: 12
Source emitted: 13
Source emitted: 14
Source emitted: 15
Source emitted: 16
Buffer emitted: [14, 15, 16]
Source emitted: 17
Source emitted: 18
Source emitted: 19
Custom buffer toggle example completed
[M]:

This example creates a custom version of the bufferToggle operator from scratch. While this approach gives you complete control, it's also more complex and error-prone. Use it only when you can't achieve your goals by composing existing operators.

[M]:

6. Using the operate Function (RxJS 7+)

In newer versions of RxJS, there's a more convenient way to create custom operators from scratch using the operate function, which simplifies the process.

[8]:
// In RxJS 7+, you can use operate (uncomment if using RxJS 7)
// RxJS 6 version for compatibility with this notebook
// Simple implementation of operate for RxJS 6
function operate(init) {
return source$ => new rxjs.Observable(subscriber => {
const sink = {
next: (value) => {
try {
subscriber.next(value);
} catch (err) {
subscriber.error(err);
}
},
error: (err) => subscriber.error(err),
complete: () => subscriber.complete(),
};
let subscription;
try {
subscription = init(source$, sink);
} catch (err) {
subscriber.error(err);
return undefined;
}
return subscription;
});
}

// Create a custom distinctUntilKeyChanged using operate
function distinctUntilKeyChanged(key) {
return operate((source, sink) => {
let hasValue = false;
let lastValue;
return source.subscribe({
Testing our custom distinctUntilKeyChanged operator:
User: {"id":1,"name":"Alice","status":"online"}
User: {"id":2,"name":"Bob","status":"offline"}
User: {"id":1,"name":"Alice","status":"offline"}
User: {"id":3,"name":"Charlie","status":"online"}
[M]:

7. When to Create vs. Compose Custom Operators

Here are some guidelines for deciding when to create custom operators and which approach to use:

  • Use composition (first approach) when:

    • You're just combining existing operators in a specific way
    • The logic is straightforward
    • You want maximum maintainability
  • Create from scratch when:

    • You need fine-grained control over subscription logic
    • You're implementing behavior not possible with existing operators
    • You're optimizing for performance
  • Use operate (RxJS 7+) when:

    • You need the control of creating from scratch but with less boilerplate
    • You're working with newer versions of RxJS
[M]:

8. Best Practices for Custom Operators

Here are some best practices to follow when creating custom operators:

[9]:
// Example showing best practices

/**
* Filters items by a property value
* @param {string} key - The property to check
* @param {any} value - The value to compare against
* @returns {Function} - An RxJS operator function
*/
function whereEqual(key, value) {
// 1. Validate inputs
if (typeof key !== 'string') {
throw new TypeError('key must be a string');
}
// 2. Return a properly typed operator function
return source$ => source$.pipe(
// 3. Use existing operators when possible
operators.filter(item => {
// 4. Handle edge cases
if (item === null || item === undefined) {
return false;
}
// 5. Keep the implementation simple and focused
return item[key] === value;
})
);
}

// Test our operator that follows best practices
const people = [
{ name: 'Alice', age: 25, role: 'developer' },
{ name: 'Bob', age: 30, role: 'manager' },
{ name: 'Charlie', age: 35, role: 'developer' },
{ name: 'Dave', age: 40, role: 'designer' },
{ name: 'Eve', age: 45, role: 'developer' }
Testing well-designed whereEqual operator:
Developer: Alice, age: 25
Developer: Charlie, age: 35
Developer: Eve, age: 45
[M]:

Best Practices Summary:

  1. Document your operators with JSDoc comments explaining purpose and parameters
  2. Validate inputs to catch errors early
  3. Keep operators focused on one concern (single responsibility principle)
  4. Handle edge cases like null or undefined values
  5. Use proper typing if using TypeScript
  6. Test your operators thoroughly, including edge cases
  7. Provide sensible defaults for parameters
  8. Follow RxJS naming conventions for consistency
  9. Clean up subscriptions to prevent memory leaks
  10. Preserve error and completion semantics - don't swallow errors or completions
[M]:

9. Real-world Custom Operator Examples

Let's create a few more practical custom operators that you might use in real applications.

[10]:
// Custom operator: executeOnce - ensures a source executes only once, no matter how many subscribers
function executeOnce() {
return source$ => {
// Create a ReplaySubject to cache and share the execution
const subject = new rxjs.ReplaySubject();
let connected = false;
return new rxjs.Observable(observer => {
// Connect to the source only once
if (!connected) {
connected = true;
source$.subscribe(subject);
}
// Return the subscription to the subject
return subject.subscribe(observer);
});
};
}

// Demonstrate executeOnce
function demonstrateExecuteOnce() {
return new Promise(resolve => {
console.log('Testing executeOnce operator:\n');
let executionCount = 0;
// Create an Observable that logs when it executes
const expensive$ = new rxjs.Observable(subscriber => {
executionCount++;
console.log(`Expensive operation executed (count: ${executionCount})\n`);
setTimeout(() => {
subscriber.next('Expensive result');
subscriber.complete();
}, 1000);
Testing executeOnce operator:
Without executeOnce (will execute twice):
Expensive operation executed (count: 1)
Expensive operation executed (count: 2)
First subscriber: Expensive result
Expensive operation cleanup
Second subscriber: Expensive result
Expensive operation cleanup

With executeOnce (will execute only once):
Expensive operation executed (count: 1)
First subscriber: Expensive result
Second subscriber: Expensive result
Expensive operation cleanup
[11]:
// Demonstrate filterByLatestFrom
function demonstrateFilterByLatestFrom() {
return new Promise(resolve => {
console.log('Testing filterByLatestFrom operator:\n');
// Create some streams to work with
const clicks$ = rxjs.interval(300).pipe(
operators.take(10),
operators.tap(n => console.log(`Click ${n}\n`))
);
const toggle$ = rxjs.concat(
rxjs.of(false),
rxjs.timer(1000).pipe(operators.mapTo(true)),
rxjs.timer(1000).pipe(operators.mapTo(false)),
rxjs.timer(1000).pipe(operators.mapTo(true))
).pipe(
operators.tap(state => console.log(`Toggle state: ${state}\n`))
);
// Only allow clicks when toggle is true
clicks$.pipe(
filterByLatestFrom(toggle$, (click, toggleState) => toggleState === true)
).subscribe({
next: click => console.log(`Allowed click: ${click}\n`),
complete: () => {
console.log('filterByLatestFrom example complete\n');
resolve();
}
});
});
}

await demonstrateFilterByLatestFrom();
Error during execution: filterByLatestFrom is not a functionTesting filterByLatestFrom operator:
[12]:
// Custom operator: debugWithLabel - adds labeled debugging without affecting the stream
function debugWithLabel(label) {
return source$ => source$.pipe(
operators.tap({
next: value => console.log(`[${label}] Next: ${JSON.stringify(value)}\n`),
error: err => console.log(`[${label}] Error: ${err.message}\n`),
complete: () => console.log(`[${label}] Complete\n`)
})
);
}

// Demonstrate debugWithLabel
const namesStream$ = rxjs.from(['Alice', 'Bob', 'Charlie']);

console.log('Testing debugWithLabel operator:\n');
namesStream$.pipe(
debugWithLabel('Raw names'),
operators.map(name => name.toUpperCase()),
debugWithLabel('Uppercase names'),
operators.filter(name => name.length > 4),
debugWithLabel('Long names')
).subscribe();
Testing debugWithLabel operator:
[Raw names] Next: "Alice"
[Uppercase names] Next: "ALICE"
[Long names] Next: "ALICE"
[Raw names] Next: "Bob"
[Uppercase names] Next: "BOB"
[Raw names] Next: "Charlie"
[Uppercase names] Next: "CHARLIE"
[Long names] Next: "CHARLIE"
[Raw names] Complete
[Uppercase names] Complete
[Long names] Complete
[13]:
// Custom operator: bufferUntilIdle - buffers values until no new values for a specified time
function bufferUntilIdle(idleTime = 1000) {
return source$ => {
return source$.pipe(
operators.buffer(
source$.pipe(
operators.debounceTime(idleTime)
)
),
operators.filter(buffer => buffer.length > 0)
);
};
}

// Demonstrate bufferUntilIdle with typing simulation
function demonstrateBufferUntilIdle() {
return new Promise(resolve => {
console.log('Testing bufferUntilIdle operator:\n');
// Simulate typing with bursts of keystrokes
const keyStrokes$ = rxjs.concat(
// First burst of typing
rxjs.from(['H', 'e', 'l', 'l', 'o']).pipe(
operators.concatMap(x => rxjs.of(x).pipe(operators.delay(150)))
),
// Pause
rxjs.timer(1500).pipe(operators.mapTo(' ')),
// Second burst of typing
rxjs.from(['w', 'o', 'r', 'l', 'd']).pipe(
operators.concatMap(x => rxjs.of(x).pipe(operators.delay(150)))
)
).pipe(
operators.tap(char => console.log(`Typed: ${char}\n`))
);
// Buffer keystrokes until 500ms of inactivity
Testing bufferUntilIdle operator:
Typed: H
Typed: H
Typed: e
Typed: e
Typed: l
Typed: l
Typed: l
Typed: l
Typed: o
Typed: o
Buffered typing: [Hello]
Typed:  
Typed:  
Typed: w
Typed: w
Typed: o
Typed: o
Typed: r
Typed: r
Typed: l
Typed: l
Typed: d
Buffered typing: [ world]
bufferUntilIdle example complete
[M]:

10. Testing Custom Operators

A key advantage of custom operators is that they can be tested in isolation. RxJS provides marble testing utilities to make this easier.

[14]:
// Let's test our multiplyAndRound operator with marble testing
const { TestScheduler } = require('rxjs/testing');

// Create a test scheduler
const testScheduler = new TestScheduler((actual, expected) => {
// Simple equality check
const actualValues = actual.map(({ frame, notification }) => ({
frame,
value: notification.kind === 'N' ? notification.value : notification.kind
}));
const expectedValues = expected.map(({ frame, notification }) => ({
frame,
value: notification.kind === 'N' ? notification.value : notification.kind
}));
console.log('Actual:', JSON.stringify(actualValues, null, 2), '\n');
console.log('Expected:', JSON.stringify(expectedValues, null, 2), '\n');
const equal = JSON.stringify(actualValues) === JSON.stringify(expectedValues);
console.log('Test passed:', equal, '\n');
});

// Run marble test
testScheduler.run(({ cold, expectObservable }) => {
console.log('Testing multiplyAndRound with marble testing:\n');
const input = cold('a-b-c-|', { a: 1.2, b: 2.7, c: 3.5 });
const expected = ' a-b-c-|';
const expectedValues = { a: 12, b: 27, c: 35 };
const result = input.pipe(multiplyAndRound());
expectObservable(result).toBe(expected, expectedValues);
});
Testing multiplyAndRound with marble testing:
Actual: [
  {
    "frame": 0,
    "value": 12
  },
  {
    "frame": 2,
    "value": 27
  },
  {
    "frame": 4,
    "value": 35
  },
  {
    "frame": 6,
    "value": "C"
  }
] 
Expected: [
  {
    "frame": 0,
    "value": 12
  },
  {
    "frame": 2,
    "value": 27
  },
  {
    "frame": 4,
    "value": 35
  },
  {
    "frame": 6,
    "value": "C"
  }
] 
Test passed: true 
[15]:
// Testing a more complex operator with async behavior
function testRetryOperator() {
return new Promise(resolve => {
console.log('Testing custom retry operator with marble testing:\n');
// Configure the scheduler to use virtual time
testScheduler.run(({ cold, expectObservable }) => {
// Create an observable that fails twice then succeeds
const source = cold('#-#-a-|', undefined, new Error('Test error'));
// Apply our custom retry operator with minimal timeouts for testing
const result = source.pipe(
retryWithExponentialBackoff({
maxRetries: 2,
initialInterval: 10,
maxInterval: 100
})
);
// We expect to get the 'a' after the retries
// The exact timing depends on the retry delays
expectObservable(result).toBe('------a-|');
});
resolve();
});
}

await testRetryOperator();
Testing custom retry operator with marble testing:
Retry attempt 1 after 10ms
Retry attempt 2 after 20ms
Actual: [
  {
    "frame": 36,
    "value": "E"
  }
] 
Expected: [
  {
    "frame": 6,
    "value": "a"
  },
  {
    "frame": 8,
    "value": "C"
  }
] 
Test passed: false 
[M]:

Best Practices for Custom Operators

When creating custom operators, follow these best practices to ensure they're reliable and maintainable:

  1. Keep operators pure and focused: Each operator should do one thing well.

  2. Handle all Observable lifecycle events: Properly handle next, error, and complete notifications.

  3. Document with input/output marble diagrams: Visual documentation makes operators easier to understand.

  4. Use existing operators when possible: Compose from built-in operators before creating from scratch.

  5. Write thorough tests: Test both normal operation and edge cases.

  6. Follow naming conventions:

    • Transformation operators: map, pluck, etc.
    • Filtering operators: filter, take, etc.
    • Combination: merge, concat, etc.
  7. Share your operators: If you've created something useful, consider sharing it with the community.

  8. Beware of memory leaks: Ensure all subscriptions are properly cleaned up.

Sign in to save your work and access it from anywhere