628 lines
14 KiB
Markdown
628 lines
14 KiB
Markdown
# RxJS Operators for Angular
|
|
|
|
**Purpose:** Essential RxJS operators every Angular developer should master
|
|
**Level:** Intermediate to Advanced
|
|
**Version:** RxJS 7+
|
|
|
|
---
|
|
|
|
## Core Concepts
|
|
|
|
**Operators** are functions that enable composing asynchronous operations with observables. They transform, filter, combine, and manage observable streams.
|
|
|
|
**Key Principles:**
|
|
- Operators are **pure functions**
|
|
- They **don't modify** the source observable
|
|
- They **return a new** observable
|
|
- They can be **chained** together
|
|
|
|
---
|
|
|
|
## Category 1: Transformation Operators
|
|
|
|
### map
|
|
|
|
**Purpose:** Transform each value emitted
|
|
|
|
```typescript
|
|
import { of } from 'rxjs';
|
|
import { map } from 'rxjs/operators';
|
|
|
|
// Example 1: Simple transformation
|
|
of(1, 2, 3).pipe(
|
|
map(x => x * 10)
|
|
).subscribe(console.log);
|
|
// Output: 10, 20, 30
|
|
|
|
// Example 2: Object transformation
|
|
interface User { id: number; name: string; }
|
|
interface UserDisplay { id: number; displayName: string; }
|
|
|
|
this.users$.pipe(
|
|
map((users: User[]) => users.map(u => ({
|
|
id: u.id,
|
|
displayName: u.name.toUpperCase()
|
|
})))
|
|
);
|
|
```
|
|
|
|
### switchMap
|
|
|
|
**Purpose:** Switch to a new observable, canceling previous
|
|
|
|
**Use when:** Making HTTP requests based on user input
|
|
|
|
```typescript
|
|
import { switchMap } from 'rxjs/operators';
|
|
|
|
// Search as user types
|
|
this.searchTerm$.pipe(
|
|
debounceTime(300),
|
|
switchMap(term => this.http.get(`/api/search?q=${term}`))
|
|
).subscribe(results => console.log(results));
|
|
|
|
// Load user details when ID changes
|
|
this.userId$.pipe(
|
|
switchMap(id => this.http.get(`/api/users/${id}`))
|
|
).subscribe(user => this.user.set(user));
|
|
```
|
|
|
|
**Why switchMap?** Automatically cancels previous HTTP request if new search term arrives.
|
|
|
|
### mergeMap (flatMap)
|
|
|
|
**Purpose:** Merge all inner observables
|
|
|
|
**Use when:** You want all requests to complete, not cancel previous
|
|
|
|
```typescript
|
|
import { mergeMap } from 'rxjs/operators';
|
|
|
|
// Send analytics for each click (don't cancel)
|
|
this.clicks$.pipe(
|
|
mergeMap(event => this.analytics.track(event))
|
|
).subscribe();
|
|
|
|
// Process multiple files in parallel
|
|
this.files$.pipe(
|
|
mergeMap(file => this.uploadFile(file))
|
|
).subscribe(result => console.log('Uploaded:', result));
|
|
```
|
|
|
|
### concatMap
|
|
|
|
**Purpose:** Process observables in order, wait for each to complete
|
|
|
|
**Use when:** Order matters (e.g., sequential API calls)
|
|
|
|
```typescript
|
|
import { concatMap } from 'rxjs/operators';
|
|
|
|
// Process queue in order
|
|
this.queue$.pipe(
|
|
concatMap(task => this.processTask(task))
|
|
).subscribe(result => console.log('Processed:', result));
|
|
|
|
// Sequential API calls
|
|
this.users$.pipe(
|
|
concatMap(user => this.http.post('/api/users', user))
|
|
).subscribe();
|
|
```
|
|
|
|
### exhaustMap
|
|
|
|
**Purpose:** Ignore new values while current is processing
|
|
|
|
**Use when:** Prevent duplicate submissions
|
|
|
|
```typescript
|
|
import { exhaustMap } from 'rxjs/operators';
|
|
|
|
// Prevent double-click on submit button
|
|
this.submitClick$.pipe(
|
|
exhaustMap(() => this.http.post('/api/form', this.formData))
|
|
).subscribe();
|
|
|
|
// Login button (ignore clicks while logging in)
|
|
this.loginAttempt$.pipe(
|
|
exhaustMap(credentials => this.auth.login(credentials))
|
|
).subscribe();
|
|
```
|
|
|
|
---
|
|
|
|
## Category 2: Filtering Operators
|
|
|
|
### filter
|
|
|
|
**Purpose:** Emit only values that pass a condition
|
|
|
|
```typescript
|
|
import { filter } from 'rxjs/operators';
|
|
|
|
// Only even numbers
|
|
of(1, 2, 3, 4, 5).pipe(
|
|
filter(x => x % 2 === 0)
|
|
).subscribe(console.log);
|
|
// Output: 2, 4
|
|
|
|
// Only non-null users
|
|
this.user$.pipe(
|
|
filter(user => user !== null)
|
|
).subscribe(user => console.log(user.name));
|
|
|
|
// Only valid emails
|
|
this.emailInput$.pipe(
|
|
filter(email => this.isValidEmail(email))
|
|
).subscribe(email => this.checkAvailability(email));
|
|
```
|
|
|
|
### debounceTime
|
|
|
|
**Purpose:** Wait for silence before emitting
|
|
|
|
**Use when:** Search input, window resize
|
|
|
|
```typescript
|
|
import { debounceTime } from 'rxjs/operators';
|
|
|
|
// Wait 300ms after user stops typing
|
|
this.searchInput$.pipe(
|
|
debounceTime(300),
|
|
switchMap(term => this.search(term))
|
|
).subscribe(results => this.results.set(results));
|
|
|
|
// Window resize handler
|
|
fromEvent(window, 'resize').pipe(
|
|
debounceTime(200)
|
|
).subscribe(() => this.handleResize());
|
|
```
|
|
|
|
### throttleTime
|
|
|
|
**Purpose:** Emit first value, then ignore for duration
|
|
|
|
**Use when:** Scroll events, rapid clicks
|
|
|
|
```typescript
|
|
import { throttleTime } from 'rxjs/operators';
|
|
|
|
// Handle scroll at most once per 100ms
|
|
fromEvent(window, 'scroll').pipe(
|
|
throttleTime(100)
|
|
).subscribe(() => this.checkScrollPosition());
|
|
|
|
// Rate-limit button clicks
|
|
this.buttonClick$.pipe(
|
|
throttleTime(1000)
|
|
).subscribe(() => this.handleClick());
|
|
```
|
|
|
|
### distinctUntilChanged
|
|
|
|
**Purpose:** Only emit when value changes
|
|
|
|
```typescript
|
|
import { distinctUntilChanged } from 'rxjs/operators';
|
|
|
|
// Only emit when search term actually changes
|
|
this.searchInput$.pipe(
|
|
distinctUntilChanged(),
|
|
switchMap(term => this.search(term))
|
|
).subscribe();
|
|
|
|
// Only emit when user ID changes
|
|
this.userId$.pipe(
|
|
distinctUntilChanged(),
|
|
switchMap(id => this.loadUser(id))
|
|
).subscribe();
|
|
```
|
|
|
|
### take / takeUntil
|
|
|
|
**Purpose:** Take specific number or until condition
|
|
|
|
```typescript
|
|
import { take, takeUntil } from 'rxjs/operators';
|
|
|
|
// Take first 5 values
|
|
this.stream$.pipe(
|
|
take(5)
|
|
).subscribe();
|
|
|
|
// Take until component destroyed
|
|
private destroy$ = new Subject<void>();
|
|
|
|
this.data$.pipe(
|
|
takeUntil(this.destroy$)
|
|
).subscribe(data => console.log(data));
|
|
|
|
ngOnDestroy() {
|
|
this.destroy$.next();
|
|
this.destroy$.complete();
|
|
}
|
|
|
|
// Modern approach with takeUntilDestroyed
|
|
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
|
|
|
|
this.data$.pipe(
|
|
takeUntilDestroyed()
|
|
).subscribe(data => console.log(data));
|
|
```
|
|
|
|
---
|
|
|
|
## Category 3: Combination Operators
|
|
|
|
### combineLatest
|
|
|
|
**Purpose:** Emit when ANY source emits (after all emit at least once)
|
|
|
|
**Use when:** Combining multiple form fields, filters
|
|
|
|
```typescript
|
|
import { combineLatest } from 'rxjs';
|
|
|
|
// Wait for both user and settings to load
|
|
combineLatest([
|
|
this.user$,
|
|
this.settings$
|
|
]).pipe(
|
|
map(([user, settings]) => ({ user, settings }))
|
|
).subscribe(data => console.log(data));
|
|
|
|
// Combine multiple filters
|
|
combineLatest([
|
|
this.searchTerm$,
|
|
this.category$,
|
|
this.priceRange$
|
|
]).pipe(
|
|
map(([search, category, price]) => ({
|
|
search, category, price
|
|
})),
|
|
switchMap(filters => this.fetchProducts(filters))
|
|
).subscribe(products => this.products.set(products));
|
|
```
|
|
|
|
### forkJoin
|
|
|
|
**Purpose:** Emit when ALL sources complete (like Promise.all)
|
|
|
|
**Use when:** Loading multiple independent resources
|
|
|
|
```typescript
|
|
import { forkJoin } from 'rxjs';
|
|
|
|
// Load multiple resources on init
|
|
forkJoin({
|
|
user: this.http.get('/api/user'),
|
|
config: this.http.get('/api/config'),
|
|
permissions: this.http.get('/api/permissions')
|
|
}).subscribe(({ user, config, permissions }) => {
|
|
this.initialize(user, config, permissions);
|
|
});
|
|
|
|
// Parallel API calls
|
|
forkJoin([
|
|
this.http.get('/api/products'),
|
|
this.http.get('/api/categories'),
|
|
this.http.get('/api/brands')
|
|
]).subscribe(([products, categories, brands]) => {
|
|
// All loaded
|
|
});
|
|
```
|
|
|
|
### merge
|
|
|
|
**Purpose:** Emit from any source as soon as it emits
|
|
|
|
**Use when:** Combining event streams
|
|
|
|
```typescript
|
|
import { merge } from 'rxjs';
|
|
|
|
// Combine multiple event sources
|
|
merge(
|
|
this.clicks$,
|
|
this.hovers$,
|
|
this.focuses$
|
|
).subscribe(event => this.trackEvent(event));
|
|
|
|
// Combine refresh triggers
|
|
merge(
|
|
this.manualRefresh$,
|
|
this.autoRefresh$,
|
|
this.dataChanged$
|
|
).pipe(
|
|
switchMap(() => this.loadData())
|
|
).subscribe();
|
|
```
|
|
|
|
### withLatestFrom
|
|
|
|
**Purpose:** Combine with latest value from other observables
|
|
|
|
**Use when:** Need secondary data with primary stream
|
|
|
|
```typescript
|
|
import { withLatestFrom } from 'rxjs/operators';
|
|
|
|
// Submit form with latest user data
|
|
this.submitButton$.pipe(
|
|
withLatestFrom(this.form$, this.user$),
|
|
map(([_, formData, user]) => ({ formData, user }))
|
|
).subscribe(({ formData, user }) => {
|
|
this.submit(formData, user);
|
|
});
|
|
|
|
// Apply filter with latest settings
|
|
this.searchTerm$.pipe(
|
|
withLatestFrom(this.filters$, this.sortOrder$),
|
|
switchMap(([term, filters, sort]) =>
|
|
this.search(term, filters, sort)
|
|
)
|
|
).subscribe();
|
|
```
|
|
|
|
---
|
|
|
|
## Category 4: Error Handling
|
|
|
|
### catchError
|
|
|
|
**Purpose:** Catch errors and return fallback observable
|
|
|
|
```typescript
|
|
import { catchError } from 'rxjs/operators';
|
|
import { of, EMPTY } from 'rxjs';
|
|
|
|
// Return empty array on error
|
|
this.http.get('/api/data').pipe(
|
|
catchError(error => {
|
|
console.error('Failed to load:', error);
|
|
return of([]); // Fallback value
|
|
})
|
|
).subscribe(data => console.log(data));
|
|
|
|
// Return empty observable (complete immediately)
|
|
this.http.get('/api/data').pipe(
|
|
catchError(() => EMPTY)
|
|
).subscribe();
|
|
|
|
// Re-throw after logging
|
|
this.http.get('/api/data').pipe(
|
|
catchError(error => {
|
|
console.error(error);
|
|
return throwError(() => error);
|
|
})
|
|
).subscribe();
|
|
```
|
|
|
|
### retry
|
|
|
|
**Purpose:** Retry failed observable
|
|
|
|
```typescript
|
|
import { retry } from 'rxjs/operators';
|
|
|
|
// Retry 3 times on failure
|
|
this.http.get('/api/data').pipe(
|
|
retry(3),
|
|
catchError(error => {
|
|
console.error('Failed after 3 retries');
|
|
return of([]);
|
|
})
|
|
).subscribe();
|
|
|
|
// Retry with delay (RxJS 7+)
|
|
import { retry } from 'rxjs/operators';
|
|
|
|
this.http.get('/api/data').pipe(
|
|
retry({
|
|
count: 3,
|
|
delay: 1000 // Wait 1s between retries
|
|
})
|
|
).subscribe();
|
|
```
|
|
|
|
---
|
|
|
|
## Category 5: Utility Operators
|
|
|
|
### tap
|
|
|
|
**Purpose:** Perform side effects without modifying stream
|
|
|
|
```typescript
|
|
import { tap } from 'rxjs/operators';
|
|
|
|
// Log for debugging
|
|
this.http.get('/api/data').pipe(
|
|
tap(data => console.log('Received:', data)),
|
|
map(data => data.items),
|
|
tap(items => console.log('Mapped:', items))
|
|
).subscribe();
|
|
|
|
// Track analytics
|
|
this.searchTerm$.pipe(
|
|
tap(term => this.analytics.track('search', { term })),
|
|
switchMap(term => this.search(term))
|
|
).subscribe();
|
|
|
|
// Update loading state
|
|
this.loadData().pipe(
|
|
tap(() => this.loading.set(true)),
|
|
finalize(() => this.loading.set(false))
|
|
).subscribe();
|
|
```
|
|
|
|
### shareReplay
|
|
|
|
**Purpose:** Share observable and replay values to new subscribers
|
|
|
|
```typescript
|
|
import { shareReplay } from 'rxjs/operators';
|
|
|
|
// Cache HTTP request
|
|
private config$ = this.http.get('/api/config').pipe(
|
|
shareReplay(1) // Cache last value
|
|
);
|
|
|
|
// Multiple subscribers get same value
|
|
this.config$.subscribe(config => console.log('Sub 1:', config));
|
|
this.config$.subscribe(config => console.log('Sub 2:', config));
|
|
// Only one HTTP request made!
|
|
```
|
|
|
|
### finalize
|
|
|
|
**Purpose:** Execute code when observable completes or errors
|
|
|
|
```typescript
|
|
import { finalize } from 'rxjs/operators';
|
|
|
|
// Always hide loading spinner
|
|
this.loadData().pipe(
|
|
tap(() => this.loading.set(true)),
|
|
finalize(() => this.loading.set(false))
|
|
).subscribe({
|
|
next: data => console.log(data),
|
|
error: err => console.error(err)
|
|
// loading.set(false) runs regardless
|
|
});
|
|
```
|
|
|
|
---
|
|
|
|
## Common Patterns
|
|
|
|
### Pattern 1: Search with Debounce
|
|
|
|
```typescript
|
|
this.searchControl.valueChanges.pipe(
|
|
debounceTime(300),
|
|
distinctUntilChanged(),
|
|
switchMap(term => this.http.get(`/api/search?q=${term}`)),
|
|
catchError(() => of([]))
|
|
).subscribe(results => this.results.set(results));
|
|
```
|
|
|
|
### Pattern 2: Auto-Save
|
|
|
|
```typescript
|
|
this.form.valueChanges.pipe(
|
|
debounceTime(2000),
|
|
distinctUntilChanged(),
|
|
tap(() => this.saving.set(true)),
|
|
switchMap(value => this.http.put('/api/save', value)),
|
|
finalize(() => this.saving.set(false))
|
|
).subscribe();
|
|
```
|
|
|
|
### Pattern 3: Polling
|
|
|
|
```typescript
|
|
import { interval, switchMap } from 'rxjs';
|
|
|
|
interval(5000).pipe(
|
|
switchMap(() => this.http.get('/api/status')),
|
|
catchError(() => of(null))
|
|
).subscribe(status => this.status.set(status));
|
|
```
|
|
|
|
### Pattern 4: Type-ahead with Minimum Length
|
|
|
|
```typescript
|
|
this.searchInput$.pipe(
|
|
debounceTime(300),
|
|
distinctUntilChanged(),
|
|
filter(term => term.length >= 3),
|
|
switchMap(term => this.search(term)),
|
|
catchError(() => of([]))
|
|
).subscribe(results => this.results.set(results));
|
|
```
|
|
|
|
---
|
|
|
|
## Decision Tree
|
|
|
|
**Need to transform values?** → `map`
|
|
**Need to switch to new observable?** → `switchMap`
|
|
**Need to wait for all to complete?** → `forkJoin`
|
|
**Need to combine latest values?** → `combineLatest`
|
|
**Need to filter values?** → `filter`
|
|
**Need to handle errors?** → `catchError`
|
|
**Need to retry?** → `retry`
|
|
**Need to share result?** → `shareReplay`
|
|
**Need to debounce?** → `debounceTime`
|
|
**Need to throttle?** → `throttleTime`
|
|
|
|
---
|
|
|
|
## Best Practices
|
|
|
|
1. **Always unsubscribe** - Use `takeUntilDestroyed()` or `async` pipe
|
|
2. **Prefer `async` pipe** over manual subscriptions
|
|
3. **Use `switchMap`** for dependent HTTP requests
|
|
4. **Use `forkJoin`** for parallel independent requests
|
|
5. **Add error handling** with `catchError`
|
|
6. **Cache with `shareReplay`** for expensive operations
|
|
7. **Debounce user input** with `debounceTime`
|
|
8. **Log with `tap`** for debugging
|
|
|
|
---
|
|
|
|
## Common Mistakes
|
|
|
|
**❌ Memory leaks:**
|
|
```typescript
|
|
// Bad - no unsubscribe
|
|
this.data$.subscribe(data => console.log(data));
|
|
```
|
|
|
|
**✅ Fixed:**
|
|
```typescript
|
|
// Good - auto unsubscribe
|
|
this.data$.pipe(
|
|
takeUntilDestroyed()
|
|
).subscribe(data => console.log(data));
|
|
|
|
// Or use async pipe
|
|
template: `{{ data$ | async }}`
|
|
```
|
|
|
|
**❌ Nested subscriptions:**
|
|
```typescript
|
|
// Bad - pyramid of doom
|
|
this.users$.subscribe(users => {
|
|
this.http.get('/api/settings').subscribe(settings => {
|
|
// ...
|
|
});
|
|
});
|
|
```
|
|
|
|
**✅ Fixed:**
|
|
```typescript
|
|
// Good - use switchMap
|
|
this.users$.pipe(
|
|
switchMap(users => this.http.get('/api/settings').pipe(
|
|
map(settings => ({ users, settings }))
|
|
))
|
|
).subscribe(({ users, settings }) => {
|
|
// ...
|
|
});
|
|
```
|
|
|
|
---
|
|
|
|
## Summary
|
|
|
|
Master these operators:
|
|
- **Transformation**: `map`, `switchMap`, `mergeMap`
|
|
- **Filtering**: `filter`, `debounceTime`, `distinctUntilChanged`
|
|
- **Combination**: `combineLatest`, `forkJoin`, `withLatestFrom`
|
|
- **Error Handling**: `catchError`, `retry`
|
|
- **Utility**: `tap`, `shareReplay`, `take`, `takeUntil`
|
|
|
|
**Key Takeaway:** Choose the right operator for the job. `switchMap` for search, `forkJoin` for parallel loads, `combineLatest` for combining streams.
|