Claude Code Plugins

Community-maintained marketplace

Feedback

Implement RxJS observables, apply operators, fix memory leaks with unsubscribe patterns, handle errors, create subjects, and build reactive data pipelines in Angular applications.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name rxjs-implementation
description Implement RxJS observables, apply operators, fix memory leaks with unsubscribe patterns, handle errors, create subjects, and build reactive data pipelines in Angular applications.

RxJS Implementation Skill

Quick Start

Observable Basics

import { Observable } from 'rxjs';

// Create observable
const observable = new Observable((observer) => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

// Subscribe
const subscription = observable.subscribe({
  next: (value) => console.log(value),
  error: (error) => console.error(error),
  complete: () => console.log('Done')
});

// Unsubscribe
subscription.unsubscribe();

Common Operators

import { map, filter, switchMap, takeUntil } from 'rxjs/operators';

// Transformation
data$.pipe(
  map(user => user.name),
  filter(name => name.length > 0)
).subscribe(name => console.log(name));

// Higher-order
userId$.pipe(
  switchMap(id => this.userService.getUser(id))
).subscribe(user => console.log(user));

Subjects

Subject Types

import { Subject, BehaviorSubject, ReplaySubject } from 'rxjs';

// Subject - No initial value
const subject = new Subject<string>();
subject.next('hello');

// BehaviorSubject - Has initial value
const behavior = new BehaviorSubject<string>('initial');
behavior.next('new value');

// ReplaySubject - Replays N values
const replay = new ReplaySubject<string>(3);
replay.next('one');
replay.next('two');

Service with Subject

@Injectable()
export class NotificationService {
  private messageSubject = new Subject<string>();
  public message$ = this.messageSubject.asObservable();

  notify(message: string) {
    this.messageSubject.next(message);
  }
}

// Usage
constructor(private notification: NotificationService) {
  this.notification.message$.subscribe(msg => {
    console.log('Notification:', msg);
  });
}

Transformation Operators

// map - Transform values
source$.pipe(
  map(user => user.name)
)

// switchMap - Switch to new observable (cancel previous)
userId$.pipe(
  switchMap(id => this.userService.getUser(id))
)

// mergeMap - Merge all results
fileIds$.pipe(
  mergeMap(id => this.downloadFile(id))
)

// concatMap - Sequential processing
tasks$.pipe(
  concatMap(task => this.processTask(task))
)

// exhaustMap - Ignore new while processing
clicks$.pipe(
  exhaustMap(() => this.longRequest())
)

Filtering Operators

// filter - Only pass matching values
data$.pipe(
  filter(item => item.active)
)

// first - Take first value
data$.pipe(first())

// take - Take N values
data$.pipe(take(5))

// takeUntil - Take until condition
data$.pipe(
  takeUntil(this.destroy$)
)

// distinct - Filter duplicates
data$.pipe(
  distinct(),
  distinctUntilChanged()
)

// debounceTime - Wait N ms
input$.pipe(
  debounceTime(300),
  distinctUntilChanged()
)

Combination Operators

import { combineLatest, merge, concat, zip } from 'rxjs';

// combineLatest - Latest from all
combineLatest([user$, settings$, theme$]).pipe(
  map(([user, settings, theme]) => ({ user, settings, theme }))
)

// merge - Values from any
merge(click$, hover$, input$)

// concat - Sequential
concat(request1$, request2$, request3$)

// zip - Wait for all
zip(form1$, form2$, form3$)

// withLatestFrom - Combine with latest
click$.pipe(
  withLatestFrom(user$),
  map(([click, user]) => ({ click, user }))
)

Error Handling

// catchError - Handle errors
data$.pipe(
  catchError(error => {
    console.error('Error:', error);
    return of(defaultValue);
  })
)

// retry - Retry on error
request$.pipe(
  retry(3),
  catchError(error => throwError(error))
)

// timeout - Timeout if no value
request$.pipe(
  timeout(5000),
  catchError(error => of(null))
)

Memory Leak Prevention

Unsubscribe Pattern

private destroy$ = new Subject<void>();

ngOnInit() {
  this.data$.pipe(
    takeUntil(this.destroy$)
  ).subscribe(data => {
    this.processData(data);
  });
}

ngOnDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

Async Pipe (Preferred)

// Component
export class UserComponent {
  user$ = this.userService.getUser(1);

  constructor(private userService: UserService) {}
}

// Template - Async pipe handles unsubscribe
<div>{{ user$ | async as user }}
  <p>{{ user.name }}</p>
</div>

Advanced Patterns

Share Operator

// Hot observable - Share single subscription
readonly users$ = this.http.get('/api/users').pipe(
  shareReplay(1) // Cache last result
);

// Now multiple subscriptions use same HTTP request
this.users$.subscribe(users => {...});
this.users$.subscribe(users => {...}); // Reuses cached

Scan for State

// Accumulate state
const counter$ = clicks$.pipe(
  scan((count) => count + 1, 0)
)

// Complex state
const appState$ = actions$.pipe(
  scan((state, action) => {
    switch(action.type) {
      case 'ADD_USER': return { ...state, users: [...state.users, action.user] };
      case 'DELETE_USER': return { ...state, users: state.users.filter(u => u.id !== action.id) };
      default: return state;
    }
  }, initialState)
)

Forkjoin for Multiple Requests

// Parallel requests
forkJoin({
  users: this.userService.getUsers(),
  settings: this.settingService.getSettings(),
  themes: this.themeService.getThemes()
}).subscribe(({ users, settings, themes }) => {
  console.log('All loaded:', users, settings, themes);
})

Testing Observables

import { marbles } from 'rxjs-marbles';

it('should map values correctly', marbles((m) => {
  const source = m.hot('a-b-|', { a: 1, b: 2 });
  const expected = m.cold('x-y-|', { x: 2, y: 4 });

  const result = source.pipe(
    map(x => x * 2)
  );

  m.expect(result).toBeObservable(expected);
}));

Best Practices

  1. Always unsubscribe: Use takeUntil or async pipe
  2. Use higher-order operators: switchMap, mergeMap, etc.
  3. Avoid nested subscriptions: Use operators instead
  4. Share subscriptions: Use share/shareReplay for expensive operations
  5. Handle errors: Always include catchError
  6. Type your observables: Observable<User> not just Observable

Common Mistakes to Avoid

// ❌ Wrong - Creates multiple subscriptions
this.data$.subscribe(d => {
  this.data$.subscribe(d2 => {
    // nested subscriptions!
  });
});

// ✅ Correct - Use switchMap
this.data$.pipe(
  switchMap(d => this.otherService.fetch(d))
).subscribe(result => {
  // handled
});

// ❌ Wrong - Memory leak
ngOnInit() {
  this.data$.subscribe(data => this.data = data);
}

// ✅ Correct - Unsubscribe or async
ngOnInit() {
  this.data$ = this.service.getData();
}
// In template: {{ data$ | async }}

Resources