Reactive Programming
Overview
Build responsive applications using reactive streams and observables for handling asynchronous data flows.
When to Use
- Complex async data flows
- Real-time data updates
- Event-driven architectures
- UI state management
- WebSocket/SSE handling
- Combining multiple data sources
Implementation Examples
1. RxJS Basics
import { Observable, Subject, BehaviorSubject, fromEvent, interval } from 'rxjs';
import { map, filter, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
// Create observable from array
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
numbers$.subscribe({
next: value => console.log(value),
error: err => console.error(err),
complete: () => console.log('Done')
});
// Subject (multicast)
const subject = new Subject<number>();
subject.subscribe(value => console.log('Sub 1:', value));
subject.subscribe(value => console.log('Sub 2:', value));
subject.next(1); // Both subscribers receive
// BehaviorSubject (with initial value)
const state$ = new BehaviorSubject({ count: 0 });
state$.subscribe(state => console.log('State:', state));
state$.next({ count: 1 });
state$.next({ count: 2 });
// Operators
const source$ = interval(1000);
source$.pipe(
map(n => n * 2),
filter(n => n > 5),
take(5)
).subscribe(value => console.log(value));
2. Search with Debounce
import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, catchError } from 'rxjs/operators';
import { of } from 'rxjs';
const searchInput = document.querySelector('#search') as HTMLInputElement;
const search$ = fromEvent(searchInput, 'input').pipe(
map((event: Event) => (event.target as HTMLInputElement).value),
debounceTime(300), // Wait 300ms after typing
distinctUntilChanged(), // Only if value changed
switchMap(query => {
if (!query) return of([]);
return fetch(`/api/search?q=${query}`)
.then(res => res.json())
.catch(() => of([]));
}),
catchError(error => {
console.error('Search error:', error);
return of([]);
})
);
search$.subscribe(results => {
console.log('Search results:', results);
displayResults(results);
});
function displayResults(results: any[]) {
// Update UI
}
3. State Management
import { BehaviorSubject } from 'rxjs';
import { map } from 'rxjs/operators';
interface AppState {
user: { id: string; name: string } | null;
cart: Array<{ id: string; quantity: number }>;
loading: boolean;
}
class StateManager {
private state$ = new BehaviorSubject<AppState>({
user: null,
cart: [],
loading: false
});
// Selectors
user$ = this.state$.pipe(
map(state => state.user),
distinctUntilChanged()
);
cart$ = this.state$.pipe(
map(state => state.cart),
distinctUntilChanged()
);
cartTotal$ = this.cart$.pipe(
map(cart => cart.reduce((sum, item) => sum + item.quantity, 0))
);
loading$ = this.state$.pipe(
map(state => state.loading)
);
// Actions
setUser(user: AppState['user']): void {
this.state$.next({
...this.state$.value,
user
});
}
addToCart(item: { id: string; quantity: number }): void {
const cart = [...this.state$.value.cart];
const existing = cart.find(i => i.id === item.id);
if (existing) {
existing.quantity += item.quantity;
} else {
cart.push(item);
}
this.state$.next({
...this.state$.value,
cart
});
}
setLoading(loading: boolean): void {
this.state$.next({
...this.state$.value,
loading
});
}
getState(): AppState {
return this.state$.value;
}
}
// Usage
const store = new StateManager();
store.user$.subscribe(user => {
console.log('User:', user);
});
store.cartTotal$.subscribe(total => {
console.log('Cart items:', total);
});
store.setUser({ id: '123', name: 'John' });
store.addToCart({ id: 'item1', quantity: 2 });
4. WebSocket with Reconnection
import { Observable, timer } from 'rxjs';
import { retryWhen, tap, delayWhen } from 'rxjs/operators';
function createWebSocketObservable(url: string): Observable<any> {
return new Observable(subscriber => {
let ws: WebSocket;
const connect = () => {
ws = new WebSocket(url);
ws.onopen = () => {
console.log('WebSocket connected');
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
subscriber.next(data);
} catch (error) {
console.error('Parse error:', error);
}
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
subscriber.error(error);
};
ws.onclose = () => {
console.log('WebSocket closed');
subscriber.error(new Error('Connection closed'));
};
};
connect();
return () => {
if (ws) {
ws.close();
}
};
}).pipe(
retryWhen(errors =>
errors.pipe(
tap(err => console.log('Retrying connection...', err)),
delayWhen((_, i) => timer(Math.min(1000 * Math.pow(2, i), 30000)))
)
)
);
}
// Usage
const ws$ = createWebSocketObservable('wss://api.example.com/ws');
ws$.subscribe({
next: data => console.log('Received:', data),
error: err => console.error('Error:', err)
});
5. Combining Multiple Streams
import { combineLatest, merge, forkJoin, zip } from 'rxjs';
// combineLatest - emits when any input emits
const users$ = fetchUsers();
const settings$ = fetchSettings();
combineLatest([users$, settings$]).subscribe(([users, settings]) => {
console.log('Users:', users);
console.log('Settings:', settings);
});
// merge - combine multiple observables
const clicks$ = fromEvent(button1, 'click');
const hovers$ = fromEvent(button2, 'mouseover');
merge(clicks$, hovers$).subscribe(event => {
console.log('Event:', event.type);
});
// forkJoin - wait for all to complete (like Promise.all)
forkJoin({
users: fetchUsers(),
posts: fetchPosts(),
comments: fetchComments()
}).subscribe(({ users, posts, comments }) => {
console.log('All data loaded:', { users, posts, comments });
});
// zip - combine corresponding values
const names$ = of('Alice', 'Bob', 'Charlie');
const ages$ = of(25, 30, 35);
zip(names$, ages$).subscribe(([name, age]) => {
console.log(`${name} is ${age} years old`);
});
6. Backpressure Handling
import { Subject } from 'rxjs';
import { bufferTime, throttleTime } from 'rxjs/operators';
// Buffer events
const events$ = new Subject<string>();
events$.pipe(
bufferTime(1000), // Collect events for 1 second
filter(buffer => buffer.length > 0)
).subscribe(events => {
console.log('Batch:', events);
processBatch(events);
});
// Throttle events
const clicks$ = fromEvent(button, 'click');
clicks$.pipe(
throttleTime(1000) // Only allow one every second
).subscribe(() => {
console.log('Click processed');
});
function processBatch(events: string[]) {
// Process batch
}
7. Custom Operators
import { Observable } from 'rxjs';
function tapLog<T>(message: string) {
return (source: Observable<T>) => {
return new Observable<T>(subscriber => {
return source.subscribe({
next: value => {
console.log(message, value);
subscriber.next(value);
},
error: err => subscriber.error(err),
complete: () => subscriber.complete()
});
});
};
}
// Usage
source$.pipe(
tapLog('Before map:'),
map(x => x * 2),
tapLog('After map:')
).subscribe();
Best Practices
✅ DO
- Unsubscribe to prevent memory leaks
- Use operators to transform data
- Handle errors properly
- Use shareReplay for expensive operations
- Combine streams when needed
- Test reactive code
❌ DON'T
- Subscribe multiple times to same observable
- Forget to unsubscribe
- Use nested subscriptions
- Ignore error handling
- Make observables stateful
Common Operators
| Operator |
Purpose |
| map |
Transform values |
| filter |
Filter values |
| debounceTime |
Wait before emitting |
| distinctUntilChanged |
Only emit if changed |
| switchMap |
Switch to new observable |
| mergeMap |
Merge multiple observables |
| catchError |
Handle errors |
| tap |
Side effects |
| take |
Take n values |
| takeUntil |
Take until condition |
Resources