jpskill.com
🛠️ 開発・MCP コミュニティ

rxjs-patterns

RxJS reactive programming patterns for Angular applications. Use when implementing observables, operators, error handling, memory management, subscription cleanup, or advanced reactive patterns. Covers operators, multicasting, backpressure, and integration with Angular Signals.

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-17
取得日時
2026-05-17
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

[Skill 名] rxjs-patterns

RxJS パターン

Angular アプリケーションにおける RxJS を用いたリアクティブプログラミングに関する専門的なガイダンスです。ベストプラクティス、一般的なパターン、パフォーマンス最適化に焦点を当てています。

このスキルを使用するタイミング

以下の必要がある場合に、このスキルを有効にしてください。

  • オブザーバブルを効果的に作成および管理する
  • データ変換のために RxJS オペレーターを連結する
  • リアクティブストリームでエラーを処理する
  • サブスクリプションによるメモリリークを防ぐ
  • デバウンス、スロットリング、またはバッファリングを実装する
  • 複数のサブスクライバーとオブザーバブルを共有する
  • RxJS を Angular Signals と統合する
  • リアクティブなデータフローを最適化する
  • 高度なパターン(リトライ、ポーリング、キャッシング)を実装する

コアオペレーター

変換

// map - 各値を変換します
source$.pipe(
  map(user => user.name)
)

// mergeMap/switchMap/concatMap/exhaustMap
// 並行処理の必要性に基づいて選択します:
// - switchMap: 以前のものをキャンセルし、検索に使用します
// - mergeMap: 並行して実行し、独立した操作に使用します
// - concatMap: 順次キューに入れ、順序付けられた操作に使用します
// - exhaustMap: 実行中は新しいものを無視し、保存/送信に使用します

// switchMap を使用した検索例
searchTerm$.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(term => this.searchService.search(term))
)

// exhaustMap を使用した保存例
saveButton$.pipe(
  exhaustMap(() => this.saveService.save(data))
)

フィルタリング

// filter - マッチする値のみを発行します
source$.pipe(
  filter(user => user.age >= 18)
)

// distinctUntilChanged - 連続する重複値をスキップします
input$.pipe(
  distinctUntilChanged()
)

// take/takeUntil - 発行を制限します
source$.pipe(
  take(5) // 最初の5つを取得
)

source$.pipe(
  takeUntil(destroy$) // 購読解除パターン
)

// debounceTime/throttleTime - レート制限
input$.pipe(
  debounceTime(300) // 最後の入力から300ms待機
)

click$.pipe(
  throttleTime(1000) // 1秒間に最大1回発行
)

結合

// combineLatest - いずれかのオブザーバブルが発行したときに発行します
combineLatest([user$, settings$]).pipe(
  map(([user, settings]) => ({ user, settings }))
)

// forkJoin - すべてが完了したときに発行します
forkJoin({
  user: getUserById(id),
  posts: getUserPosts(id),
  comments: getUserComments(id)
}).subscribe(({ user, posts, comments }) => {
  // すべてロードされました
})

// merge - 複数のオブザーバブルをマージします
merge(click$, hover$, focus$).subscribe()

// zip - インデックスで発行をペアにします
zip(numbers$, letters$).pipe(
  map(([num, letter]) => `${num}${letter}`)
)

エラー処理

catchError

// エラーを適切に処理します
this.http.get('/api/data').pipe(
  catchError(error => {
    console.error('Error:', error);
    return of([]); // フォールバック値を返します
  })
)

// ログ記録後に再スローします
this.http.get('/api/data').pipe(
  catchError(error => {
    this.logger.error(error);
    return throwError(() => new Error('Failed to load data'));
  })
)

リトライロジック

// retry - エラー時にリトライします
this.http.get('/api/data').pipe(
  retry(3),
  catchError(error => of([]))
)

// retryWhen - 遅延を伴う高度なリトライ
this.http.get('/api/data').pipe(
  retryWhen(errors => errors.pipe(
    scan((retryCount, err) => {
      if (retryCount >= 3) {
        throw err;
      }
      return retryCount + 1;
    }, 0),
    delay(1000) // リトライ間に1秒待機
  ))
)

メモリ管理

サブスクリプションのクリーンアップ

// ❌ BAD - メモリリーク
export class BadComponent {
  ngOnInit() {
    this.dataService.getData().subscribe(data => {
      this.data = data;
    });
  }
}

// ✅ GOOD - 手動クリーンアップ
export class GoodComponent implements OnDestroy {
  private subscription = new Subscription();

  ngOnInit() {
    this.subscription.add(
      this.dataService.getData().subscribe(data => {
        this.data = data;
      })
    );
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();
  }
}

// ✅ BETTER - takeUntil パターン
export class BetterComponent implements OnDestroy {
  private destroy$ = new Subject<void>();

  ngOnInit() {
    this.dataService.getData().pipe(
      takeUntil(this.destroy$)
    ).subscribe(data => {
      this.data = data;
    });
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

// ✅ BEST - toSignal (Angular 16+)
export class BestComponent {
  data = toSignal(
    this.dataService.getData(),
    { initialValue: [] }
  );
}

オブザーバブルの共有

// ❌ BAD - 複数の HTTP リクエスト
const data$ = this.http.get('/api/data');
data$.subscribe(x => console.log(x));
data$.subscribe(y => console.log(y)); // 2回目のリクエスト!

// ✅ GOOD - shareReplay で共有
const data$ = this.http.get('/api/data').pipe(
  shareReplay({ bufferSize: 1, refCount: true })
);
data$.subscribe(x => console.log(x));
data$.subscribe(y => console.log(y)); // キャッシュされた結果を使用

Angular Signals との統合

toSignal

// オブザーバブルをシグナルに変換します
export class Component {
  private dataService = inject(DataService);

  // 自動サブスクリプション管理
  data = toSignal(
    this.dataService.getData(),
    { initialValue: [] }
  );

  // テンプレートでの使用
  template: `{{ data().length }} items`
}

toObservable

// シグナルをオブザーバブルに変換します
export class Component {
  searchTerm = signal('');

  results$ = toObservable(this.searchTerm).pipe(
    debounceTime(300),
    distinctUntilChanged(),
    switchMap(term => this.searchService.search(term))
  );

  results = toSignal(this.results$, { initialValue: [] });
}

高度なパターン

ポーリング

// 5秒ごとにポーリングします
interval(5000).pipe(
  startWith(0),
  switchMap(() => this.http.get('/api/status')),
  takeUntil(this.destroy$)
).subscribe(status => {
  this.status = status;
});

有効期限付きキャッシュ


@Injectable({ providedIn: 'root' })
export class CachedDataService {
  private cache$ = new ReplaySubject<Data[]>(1);
  private cacheAge = 0;
  private readonly CACHE_DURATION = 5 * 60 * 1000; // 5分

  getData(): 

(原文がここで切り詰められています)
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

RxJS Patterns

Expert guidance for reactive programming with RxJS in Angular applications, focusing on best practices, common patterns, and performance optimization.

When to Use This Skill

Activate this skill when you need to:

  • Create and manage observables effectively
  • Chain RxJS operators for data transformation
  • Handle errors in reactive streams
  • Prevent memory leaks from subscriptions
  • Implement debouncing, throttling, or buffering
  • Share observables with multiple subscribers
  • Integrate RxJS with Angular Signals
  • Optimize reactive data flows
  • Implement advanced patterns (retry, polling, caching)

Core Operators

Transformation

// map - Transform each value
source$.pipe(
  map(user => user.name)
)

// mergeMap/switchMap/concatMap/exhaustMap
// Choose based on concurrency needs:
// - switchMap: Cancel previous, use for search
// - mergeMap: Run concurrently, use for independent operations
// - concatMap: Queue sequentially, use for ordered operations
// - exhaustMap: Ignore new while running, use for save/submit

// Search example with switchMap
searchTerm$.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(term => this.searchService.search(term))
)

// Save example with exhaustMap
saveButton$.pipe(
  exhaustMap(() => this.saveService.save(data))
)

Filtering

// filter - Emit only matching values
source$.pipe(
  filter(user => user.age >= 18)
)

// distinctUntilChanged - Skip duplicate consecutive values
input$.pipe(
  distinctUntilChanged()
)

// take/takeUntil - Limit emissions
source$.pipe(
  take(5) // Take first 5
)

source$.pipe(
  takeUntil(destroy$) // Unsubscribe pattern
)

// debounceTime/throttleTime - Rate limiting
input$.pipe(
  debounceTime(300) // Wait 300ms after last input
)

click$.pipe(
  throttleTime(1000) // Emit at most once per second
)

Combination

// combineLatest - Emit when any observable emits
combineLatest([user$, settings$]).pipe(
  map(([user, settings]) => ({ user, settings }))
)

// forkJoin - Emit when all complete
forkJoin({
  user: getUserById(id),
  posts: getUserPosts(id),
  comments: getUserComments(id)
}).subscribe(({ user, posts, comments }) => {
  // All loaded
})

// merge - Merge multiple observables
merge(click$, hover$, focus$).subscribe()

// zip - Pair emissions by index
zip(numbers$, letters$).pipe(
  map(([num, letter]) => `${num}${letter}`)
)

Error Handling

catchError

// Handle errors gracefully
this.http.get('/api/data').pipe(
  catchError(error => {
    console.error('Error:', error);
    return of([]); // Return fallback value
  })
)

// Re-throw after logging
this.http.get('/api/data').pipe(
  catchError(error => {
    this.logger.error(error);
    return throwError(() => new Error('Failed to load data'));
  })
)

Retry Logic

// retry - Retry on error
this.http.get('/api/data').pipe(
  retry(3),
  catchError(error => of([]))
)

// retryWhen - Advanced retry with delay
this.http.get('/api/data').pipe(
  retryWhen(errors => errors.pipe(
    scan((retryCount, err) => {
      if (retryCount >= 3) {
        throw err;
      }
      return retryCount + 1;
    }, 0),
    delay(1000) // Wait 1s between retries
  ))
)

Memory Management

Subscription Cleanup

// ❌ BAD - Memory leak
export class BadComponent {
  ngOnInit() {
    this.dataService.getData().subscribe(data => {
      this.data = data;
    });
  }
}

// ✅ GOOD - Manual cleanup
export class GoodComponent implements OnDestroy {
  private subscription = new Subscription();

  ngOnInit() {
    this.subscription.add(
      this.dataService.getData().subscribe(data => {
        this.data = data;
      })
    );
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();
  }
}

// ✅ BETTER - takeUntil pattern
export class BetterComponent implements OnDestroy {
  private destroy$ = new Subject<void>();

  ngOnInit() {
    this.dataService.getData().pipe(
      takeUntil(this.destroy$)
    ).subscribe(data => {
      this.data = data;
    });
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

// ✅ BEST - toSignal (Angular 16+)
export class BestComponent {
  data = toSignal(
    this.dataService.getData(),
    { initialValue: [] }
  );
}

Sharing Observables

// ❌ BAD - Multiple HTTP requests
const data$ = this.http.get('/api/data');
data$.subscribe(x => console.log(x));
data$.subscribe(y => console.log(y)); // Second request!

// ✅ GOOD - Share with shareReplay
const data$ = this.http.get('/api/data').pipe(
  shareReplay({ bufferSize: 1, refCount: true })
);
data$.subscribe(x => console.log(x));
data$.subscribe(y => console.log(y)); // Uses cached result

Integration with Angular Signals

toSignal

// Convert observable to signal
export class Component {
  private dataService = inject(DataService);

  // Automatic subscription management
  data = toSignal(
    this.dataService.getData(),
    { initialValue: [] }
  );

  // Use in template
  template: `{{ data().length }} items`
}

toObservable

// Convert signal to observable
export class Component {
  searchTerm = signal('');

  results$ = toObservable(this.searchTerm).pipe(
    debounceTime(300),
    distinctUntilChanged(),
    switchMap(term => this.searchService.search(term))
  );

  results = toSignal(this.results$, { initialValue: [] });
}

Advanced Patterns

Polling

// Poll every 5 seconds
interval(5000).pipe(
  startWith(0),
  switchMap(() => this.http.get('/api/status')),
  takeUntil(this.destroy$)
).subscribe(status => {
  this.status = status;
});

Caching with Expiration

@Injectable({ providedIn: 'root' })
export class CachedDataService {
  private cache$ = new ReplaySubject<Data[]>(1);
  private cacheAge = 0;
  private readonly CACHE_DURATION = 5 * 60 * 1000; // 5 minutes

  getData(): Observable<Data[]> {
    const now = Date.now();

    if (now - this.cacheAge > this.CACHE_DURATION) {
      this.http.get<Data[]>('/api/data').subscribe(data => {
        this.cache$.next(data);
        this.cacheAge = now;
      });
    }

    return this.cache$.asObservable();
  }
}

Optimistic Updates

updateItem(id: string, changes: Partial<Item>): Observable<Item> {
  // Optimistically update UI
  const optimisticItem = { ...this.currentItem, ...changes };
  this.items$.next(this.items$.value.map(item => 
    item.id === id ? optimisticItem : item
  ));

  // Send to server
  return this.http.put<Item>(`/api/items/${id}`, changes).pipe(
    tap(serverItem => {
      // Update with server response
      this.items$.next(this.items$.value.map(item => 
        item.id === id ? serverItem : item
      ));
    }),
    catchError(error => {
      // Rollback on error
      this.items$.next(this.items$.value.map(item => 
        item.id === id ? this.currentItem : item
      ));
      return throwError(() => error);
    })
  );
}

Best Practices

  • ✅ Always unsubscribe or use takeUntil
  • ✅ Use toSignal for automatic cleanup
  • ✅ Share expensive observables with shareReplay
  • ✅ Choose the right flattening operator (switchMap, mergeMap, etc.)
  • ✅ Handle errors with catchError
  • ✅ Use async pipe in templates when possible
  • ❌ Don't subscribe in services (return observables)
  • ❌ Don't manually create subscriptions unnecessarily
  • ❌ Don't forget to complete subjects

References