14 KiB
RxJS Operators for Angular
Purpose: Essential RxJS operators every Angular developer should master
Level: Intermediate to Advanced
Version: RxJS 7+
Core Concepts
Operators are functions that enable composing asynchronous operations with observables. They transform, filter, combine, and manage observable streams.
Key Principles:
- Operators are pure functions
- They don't modify the source observable
- They return a new observable
- They can be chained together
Category 1: Transformation Operators
map
Purpose: Transform each value emitted
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
// Example 1: Simple transformation
of(1, 2, 3).pipe(
map(x => x * 10)
).subscribe(console.log);
// Output: 10, 20, 30
// Example 2: Object transformation
interface User { id: number; name: string; }
interface UserDisplay { id: number; displayName: string; }
this.users$.pipe(
map((users: User[]) => users.map(u => ({
id: u.id,
displayName: u.name.toUpperCase()
})))
);
switchMap
Purpose: Switch to a new observable, canceling previous
Use when: Making HTTP requests based on user input
import { switchMap } from 'rxjs/operators';
// Search as user types
this.searchTerm$.pipe(
debounceTime(300),
switchMap(term => this.http.get(`/api/search?q=${term}`))
).subscribe(results => console.log(results));
// Load user details when ID changes
this.userId$.pipe(
switchMap(id => this.http.get(`/api/users/${id}`))
).subscribe(user => this.user.set(user));
Why switchMap? Automatically cancels previous HTTP request if new search term arrives.
mergeMap (flatMap)
Purpose: Merge all inner observables
Use when: You want all requests to complete, not cancel previous
import { mergeMap } from 'rxjs/operators';
// Send analytics for each click (don't cancel)
this.clicks$.pipe(
mergeMap(event => this.analytics.track(event))
).subscribe();
// Process multiple files in parallel
this.files$.pipe(
mergeMap(file => this.uploadFile(file))
).subscribe(result => console.log('Uploaded:', result));
concatMap
Purpose: Process observables in order, wait for each to complete
Use when: Order matters (e.g., sequential API calls)
import { concatMap } from 'rxjs/operators';
// Process queue in order
this.queue$.pipe(
concatMap(task => this.processTask(task))
).subscribe(result => console.log('Processed:', result));
// Sequential API calls
this.users$.pipe(
concatMap(user => this.http.post('/api/users', user))
).subscribe();
exhaustMap
Purpose: Ignore new values while current is processing
Use when: Prevent duplicate submissions
import { exhaustMap } from 'rxjs/operators';
// Prevent double-click on submit button
this.submitClick$.pipe(
exhaustMap(() => this.http.post('/api/form', this.formData))
).subscribe();
// Login button (ignore clicks while logging in)
this.loginAttempt$.pipe(
exhaustMap(credentials => this.auth.login(credentials))
).subscribe();
Category 2: Filtering Operators
filter
Purpose: Emit only values that pass a condition
import { filter } from 'rxjs/operators';
// Only even numbers
of(1, 2, 3, 4, 5).pipe(
filter(x => x % 2 === 0)
).subscribe(console.log);
// Output: 2, 4
// Only non-null users
this.user$.pipe(
filter(user => user !== null)
).subscribe(user => console.log(user.name));
// Only valid emails
this.emailInput$.pipe(
filter(email => this.isValidEmail(email))
).subscribe(email => this.checkAvailability(email));
debounceTime
Purpose: Wait for silence before emitting
Use when: Search input, window resize
import { debounceTime } from 'rxjs/operators';
// Wait 300ms after user stops typing
this.searchInput$.pipe(
debounceTime(300),
switchMap(term => this.search(term))
).subscribe(results => this.results.set(results));
// Window resize handler
fromEvent(window, 'resize').pipe(
debounceTime(200)
).subscribe(() => this.handleResize());
throttleTime
Purpose: Emit first value, then ignore for duration
Use when: Scroll events, rapid clicks
import { throttleTime } from 'rxjs/operators';
// Handle scroll at most once per 100ms
fromEvent(window, 'scroll').pipe(
throttleTime(100)
).subscribe(() => this.checkScrollPosition());
// Rate-limit button clicks
this.buttonClick$.pipe(
throttleTime(1000)
).subscribe(() => this.handleClick());
distinctUntilChanged
Purpose: Only emit when value changes
import { distinctUntilChanged } from 'rxjs/operators';
// Only emit when search term actually changes
this.searchInput$.pipe(
distinctUntilChanged(),
switchMap(term => this.search(term))
).subscribe();
// Only emit when user ID changes
this.userId$.pipe(
distinctUntilChanged(),
switchMap(id => this.loadUser(id))
).subscribe();
take / takeUntil
Purpose: Take specific number or until condition
import { take, takeUntil } from 'rxjs/operators';
// Take first 5 values
this.stream$.pipe(
take(5)
).subscribe();
// Take until component destroyed
private destroy$ = new Subject<void>();
this.data$.pipe(
takeUntil(this.destroy$)
).subscribe(data => console.log(data));
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
// Modern approach with takeUntilDestroyed
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
this.data$.pipe(
takeUntilDestroyed()
).subscribe(data => console.log(data));
Category 3: Combination Operators
combineLatest
Purpose: Emit when ANY source emits (after all emit at least once)
Use when: Combining multiple form fields, filters
import { combineLatest } from 'rxjs';
// Wait for both user and settings to load
combineLatest([
this.user$,
this.settings$
]).pipe(
map(([user, settings]) => ({ user, settings }))
).subscribe(data => console.log(data));
// Combine multiple filters
combineLatest([
this.searchTerm$,
this.category$,
this.priceRange$
]).pipe(
map(([search, category, price]) => ({
search, category, price
})),
switchMap(filters => this.fetchProducts(filters))
).subscribe(products => this.products.set(products));
forkJoin
Purpose: Emit when ALL sources complete (like Promise.all)
Use when: Loading multiple independent resources
import { forkJoin } from 'rxjs';
// Load multiple resources on init
forkJoin({
user: this.http.get('/api/user'),
config: this.http.get('/api/config'),
permissions: this.http.get('/api/permissions')
}).subscribe(({ user, config, permissions }) => {
this.initialize(user, config, permissions);
});
// Parallel API calls
forkJoin([
this.http.get('/api/products'),
this.http.get('/api/categories'),
this.http.get('/api/brands')
]).subscribe(([products, categories, brands]) => {
// All loaded
});
merge
Purpose: Emit from any source as soon as it emits
Use when: Combining event streams
import { merge } from 'rxjs';
// Combine multiple event sources
merge(
this.clicks$,
this.hovers$,
this.focuses$
).subscribe(event => this.trackEvent(event));
// Combine refresh triggers
merge(
this.manualRefresh$,
this.autoRefresh$,
this.dataChanged$
).pipe(
switchMap(() => this.loadData())
).subscribe();
withLatestFrom
Purpose: Combine with latest value from other observables
Use when: Need secondary data with primary stream
import { withLatestFrom } from 'rxjs/operators';
// Submit form with latest user data
this.submitButton$.pipe(
withLatestFrom(this.form$, this.user$),
map(([_, formData, user]) => ({ formData, user }))
).subscribe(({ formData, user }) => {
this.submit(formData, user);
});
// Apply filter with latest settings
this.searchTerm$.pipe(
withLatestFrom(this.filters$, this.sortOrder$),
switchMap(([term, filters, sort]) =>
this.search(term, filters, sort)
)
).subscribe();
Category 4: Error Handling
catchError
Purpose: Catch errors and return fallback observable
import { catchError } from 'rxjs/operators';
import { of, EMPTY } from 'rxjs';
// Return empty array on error
this.http.get('/api/data').pipe(
catchError(error => {
console.error('Failed to load:', error);
return of([]); // Fallback value
})
).subscribe(data => console.log(data));
// Return empty observable (complete immediately)
this.http.get('/api/data').pipe(
catchError(() => EMPTY)
).subscribe();
// Re-throw after logging
this.http.get('/api/data').pipe(
catchError(error => {
console.error(error);
return throwError(() => error);
})
).subscribe();
retry
Purpose: Retry failed observable
import { retry } from 'rxjs/operators';
// Retry 3 times on failure
this.http.get('/api/data').pipe(
retry(3),
catchError(error => {
console.error('Failed after 3 retries');
return of([]);
})
).subscribe();
// Retry with delay (RxJS 7+)
import { retry } from 'rxjs/operators';
this.http.get('/api/data').pipe(
retry({
count: 3,
delay: 1000 // Wait 1s between retries
})
).subscribe();
Category 5: Utility Operators
tap
Purpose: Perform side effects without modifying stream
import { tap } from 'rxjs/operators';
// Log for debugging
this.http.get('/api/data').pipe(
tap(data => console.log('Received:', data)),
map(data => data.items),
tap(items => console.log('Mapped:', items))
).subscribe();
// Track analytics
this.searchTerm$.pipe(
tap(term => this.analytics.track('search', { term })),
switchMap(term => this.search(term))
).subscribe();
// Update loading state
this.loadData().pipe(
tap(() => this.loading.set(true)),
finalize(() => this.loading.set(false))
).subscribe();
shareReplay
Purpose: Share observable and replay values to new subscribers
import { shareReplay } from 'rxjs/operators';
// Cache HTTP request
private config$ = this.http.get('/api/config').pipe(
shareReplay(1) // Cache last value
);
// Multiple subscribers get same value
this.config$.subscribe(config => console.log('Sub 1:', config));
this.config$.subscribe(config => console.log('Sub 2:', config));
// Only one HTTP request made!
finalize
Purpose: Execute code when observable completes or errors
import { finalize } from 'rxjs/operators';
// Always hide loading spinner
this.loadData().pipe(
tap(() => this.loading.set(true)),
finalize(() => this.loading.set(false))
).subscribe({
next: data => console.log(data),
error: err => console.error(err)
// loading.set(false) runs regardless
});
Common Patterns
Pattern 1: Search with Debounce
this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(term => this.http.get(`/api/search?q=${term}`)),
catchError(() => of([]))
).subscribe(results => this.results.set(results));
Pattern 2: Auto-Save
this.form.valueChanges.pipe(
debounceTime(2000),
distinctUntilChanged(),
tap(() => this.saving.set(true)),
switchMap(value => this.http.put('/api/save', value)),
finalize(() => this.saving.set(false))
).subscribe();
Pattern 3: Polling
import { interval, switchMap } from 'rxjs';
interval(5000).pipe(
switchMap(() => this.http.get('/api/status')),
catchError(() => of(null))
).subscribe(status => this.status.set(status));
Pattern 4: Type-ahead with Minimum Length
this.searchInput$.pipe(
debounceTime(300),
distinctUntilChanged(),
filter(term => term.length >= 3),
switchMap(term => this.search(term)),
catchError(() => of([]))
).subscribe(results => this.results.set(results));
Decision Tree
Need to transform values? → map
Need to switch to new observable? → switchMap
Need to wait for all to complete? → forkJoin
Need to combine latest values? → combineLatest
Need to filter values? → filter
Need to handle errors? → catchError
Need to retry? → retry
Need to share result? → shareReplay
Need to debounce? → debounceTime
Need to throttle? → throttleTime
Best Practices
- Always unsubscribe - Use
takeUntilDestroyed()orasyncpipe - Prefer
asyncpipe over manual subscriptions - Use
switchMapfor dependent HTTP requests - Use
forkJoinfor parallel independent requests - Add error handling with
catchError - Cache with
shareReplayfor expensive operations - Debounce user input with
debounceTime - Log with
tapfor debugging
Common Mistakes
❌ Memory leaks:
// Bad - no unsubscribe
this.data$.subscribe(data => console.log(data));
✅ Fixed:
// Good - auto unsubscribe
this.data$.pipe(
takeUntilDestroyed()
).subscribe(data => console.log(data));
// Or use async pipe
template: `{{ data$ | async }}`
❌ Nested subscriptions:
// Bad - pyramid of doom
this.users$.subscribe(users => {
this.http.get('/api/settings').subscribe(settings => {
// ...
});
});
✅ Fixed:
// Good - use switchMap
this.users$.pipe(
switchMap(users => this.http.get('/api/settings').pipe(
map(settings => ({ users, settings }))
))
).subscribe(({ users, settings }) => {
// ...
});
Summary
Master these operators:
- Transformation:
map,switchMap,mergeMap - Filtering:
filter,debounceTime,distinctUntilChanged - Combination:
combineLatest,forkJoin,withLatestFrom - Error Handling:
catchError,retry - Utility:
tap,shareReplay,take,takeUntil
Key Takeaway: Choose the right operator for the job. switchMap for search, forkJoin for parallel loads, combineLatest for combining streams.