rxjs-patterns-for-angular
Angularでリアクティブプログラミングを行う際に、Observableやoperatorを効果的に活用し、データの流れやエラー処理を最適化、さらにAngular Signalsとの連携や購読解除を確実に行うためのRxJSパターンを実装するSkill。
📜 元の英語説明(参考)
Implement RxJS patterns for reactive programming in Angular. Use this skill when working with Observables, operators, subscriptions, async data flows, and error handling. Covers common patterns like combineLatest, switchMap, debounceTime, catchError, retry logic, and integration with Angular Signals using toSignal() and toObservable(). Ensures proper subscription cleanup with takeUntilDestroyed().
🇯🇵 日本人クリエイター向け解説
Angularでリアクティブプログラミングを行う際に、Observableやoperatorを効果的に活用し、データの流れやエラー処理を最適化、さらにAngular Signalsとの連携や購読解除を確実に行うためのRxJSパターンを実装するSkill。
※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。
下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o rxjs-patterns-for-angular.zip https://jpskill.com/download/16786.zip && unzip -o rxjs-patterns-for-angular.zip && rm rxjs-patterns-for-angular.zip
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/16786.zip -OutFile "$d\rxjs-patterns-for-angular.zip"; Expand-Archive "$d\rxjs-patterns-for-angular.zip" -DestinationPath $d -Force; ri "$d\rxjs-patterns-for-angular.zip"
完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。
💾 手動でダウンロードしたい(コマンドが難しい人向け)
- 1. 下の青いボタンを押して
rxjs-patterns-for-angular.zipをダウンロード - 2. ZIPファイルをダブルクリックで解凍 →
rxjs-patterns-for-angularフォルダができる - 3. そのフォルダを
C:\Users\あなたの名前\.claude\skills\(Win)または~/.claude/skills/(Mac)へ移動 - 4. Claude Code を再起動
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-18
- 取得日時
- 2026-05-18
- 同梱ファイル
- 1
📖 Skill本文(日本語訳)
※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。
Angular Skill 向けの RxJS パターン
このスキルは、Angular アプリケーションで RxJS を使用してリアクティブなパターンを実装するのに役立ちます。
コア原則
モダン Angular + RxJS
- Signals First: 状態には Signals を、非同期処理には RxJS を使用します。
- Auto Cleanup: サブスクリプションの管理には
takeUntilDestroyed()を使用します。 - Interop: Signal/Observable の変換には
toSignal()とtoObservable()を使用します。 - AsyncPipe: Signals を使用しない場合は、テンプレートで AsyncPipe を優先します。
主要な概念
- 非同期データストリームのための Observables
- データ変換のための Operators
- サブスクリプションの管理とクリーンアップ
- エラー処理とリトライロジック
Signal + RxJS の統合
toSignal() - Observable から Signal へ
import { Component, inject } from '@angular/core';
import { toSignal } from '@angular/core/rxjs-interop';
import { HttpClient } from '@angular/common/http';
@Component({
selector: 'app-task-list',
template: `
@if (tasks(); as taskList) {
@for (task of taskList; track task.id) {
<div>{{ task.title }}</div>
}
}
`
})
export class TaskListComponent {
private http = inject(HttpClient);
// Observable を Signal に変換
tasks = toSignal(
this.http.get<Task[]>('/api/tasks'),
{ initialValue: [] }
);
}
toObservable() - Signal から Observable へ
import { Component, signal } from '@angular/core';
import { toObservable } from '@angular/core/rxjs-interop';
import { switchMap } from 'rxjs/operators';
@Component({
selector: 'app-search',
template: `
<input
nz-input
[ngModel]="searchQuery()"
(ngModelChange)="searchQuery.set($event)"
/>
@if (results(); as resultList) {
@for (result of resultList; track result.id) {
<div>{{ result.name }}</div>
}
}
`
})
export class SearchComponent {
searchQuery = signal('');
// Signal を Observable に変換して変換
private searchQuery$ = toObservable(this.searchQuery);
results = toSignal(
this.searchQuery$.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => this.searchService.search(query))
),
{ initialValue: [] }
);
}
サブスクリプション管理
takeUntilDestroyed() - 自動クリーンアップ
import { Component, inject, signal, DestroyRef } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { interval } from 'rxjs';
@Component({
selector: 'app-timer',
template: `<div>Time: {{ time() }}</div>`
})
export class TimerComponent {
private destroyRef = inject(DestroyRef);
time = signal(0);
constructor() {
// コンポーネント破棄時にサブスクリプションが自動的にクリーンアップされます
interval(1000)
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(value => this.time.set(value));
}
}
手動クリーンアップ (レガシーパターン - 回避)
// ❌ DON'T: 手動でのサブスクリプション管理 (古いパターン)
export class LegacyComponent implements OnDestroy {
private subscription = new Subscription();
ngOnInit() {
this.subscription.add(
this.dataService.getData().subscribe(data => {
// データを処理
})
);
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
// ✅ DO: takeUntilDestroyed() を使用
export class ModernComponent {
private destroyRef = inject(DestroyRef);
data = signal<any>(null);
constructor() {
this.dataService.getData()
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(data => this.data.set(data));
}
}
一般的な Operators
switchMap - 新しい Observable に切り替え
// クエリが変更されるたびに新しい検索に切り替え
searchResults$ = this.searchQuery$.pipe(
debounceTime(300),
switchMap(query => this.http.get(`/api/search?q=${query}`))
);
mergeMap - 複数の Observables をマージ
// すべてのタスクを並行して処理
processTasks$ = this.tasks$.pipe(
mergeMap(tasks =>
from(tasks).pipe(
mergeMap(task => this.processTask(task))
)
)
);
concatMap - 順番に処理
// タスクを 1 つずつ順番に処理
processTasks$ = this.tasks$.pipe(
concatMap(tasks =>
from(tasks).pipe(
concatMap(task => this.processTask(task))
)
)
);
debounceTime - 入力をデバウンス
// ユーザーの入力が止まってから 300ms 待ちます
search$ = this.searchInput$.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => this.searchService.search(query))
);
distinctUntilChanged - 重複をスキップ
// 値が実際に変更された場合にのみ発行
status$ = this.statusSubject$.pipe(
distinctUntilChanged()
);
filter - 値をフィルタリング
// 空でない文字列のみを発行
nonEmptySearch$ = this.searchQuery$.pipe(
filter(query => query.trim().length > 0),
switchMap(query => this.search(query))
);
map - 値を変換
// タスクを表示形式に変換
taskDisplay$ = this.task$.pipe(
map(task => ({
title: task.title,
status: task.status.toUpperCase(),
dueDate: formatDate(task.dueDate)
}))
);
tap - 副作用
// 変換せずにログ記録
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
tap(tasks => console.log('Loaded tasks:', tasks.length)),
tap(tasks => this.analyticsService.track('tasks_loaded'))
);
Observables の結合
combineLatest - すべてを待つ
import { combineLatest } from 'rxjs';
// 複数の observables を結合
viewModel$ = combineLatest([
this.tasks$,
this.users$,
this.settings$
]).pipe(
map(([tasks, users, settings]) => ({
tasks,
users,
settings
}))
);
// Signal に変換
viewModel = toSignal(this.viewModel$);
forkJoin - すべての完了を待つ
import { forkJoin } from 'rxjs';
// 複数のリソースを並行してロード
loadAll$ = forkJoin({
tasks: this.taskService.getTasks(),
users: this.userService.getUsers(),
projects: this.projectService.getProjects(
(原文がここで切り詰められています) 📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開
RxJS Patterns for Angular Skill
This skill helps implement reactive patterns using RxJS in Angular applications.
Core Principles
Modern Angular + RxJS
- Signals First: Use Signals for state, RxJS for async operations
- Auto Cleanup: Use
takeUntilDestroyed()for subscription management - Interop: Use
toSignal()andtoObservable()for Signal/Observable conversion - AsyncPipe: Prefer AsyncPipe in templates when not using Signals
Key Concepts
- Observables for async data streams
- Operators for data transformation
- Subscription management and cleanup
- Error handling and retry logic
Signal + RxJS Integration
toSignal() - Observable to Signal
import { Component, inject } from '@angular/core';
import { toSignal } from '@angular/core/rxjs-interop';
import { HttpClient } from '@angular/common/http';
@Component({
selector: 'app-task-list',
template: `
@if (tasks(); as taskList) {
@for (task of taskList; track task.id) {
<div>{{ task.title }}</div>
}
}
`
})
export class TaskListComponent {
private http = inject(HttpClient);
// Convert Observable to Signal
tasks = toSignal(
this.http.get<Task[]>('/api/tasks'),
{ initialValue: [] }
);
}
toObservable() - Signal to Observable
import { Component, signal } from '@angular/core';
import { toObservable } from '@angular/core/rxjs-interop';
import { switchMap } from 'rxjs/operators';
@Component({
selector: 'app-search',
template: `
<input
nz-input
[ngModel]="searchQuery()"
(ngModelChange)="searchQuery.set($event)"
/>
@if (results(); as resultList) {
@for (result of resultList; track result.id) {
<div>{{ result.name }}</div>
}
}
`
})
export class SearchComponent {
searchQuery = signal('');
// Convert Signal to Observable and transform
private searchQuery$ = toObservable(this.searchQuery);
results = toSignal(
this.searchQuery$.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => this.searchService.search(query))
),
{ initialValue: [] }
);
}
Subscription Management
takeUntilDestroyed() - Auto Cleanup
import { Component, inject, signal, DestroyRef } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { interval } from 'rxjs';
@Component({
selector: 'app-timer',
template: `<div>Time: {{ time() }}</div>`
})
export class TimerComponent {
private destroyRef = inject(DestroyRef);
time = signal(0);
constructor() {
// Subscription automatically cleaned up on component destroy
interval(1000)
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(value => this.time.set(value));
}
}
Manual Cleanup (Legacy Pattern - Avoid)
// ❌ DON'T: Manual subscription management (old pattern)
export class LegacyComponent implements OnDestroy {
private subscription = new Subscription();
ngOnInit() {
this.subscription.add(
this.dataService.getData().subscribe(data => {
// handle data
})
);
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
// ✅ DO: Use takeUntilDestroyed()
export class ModernComponent {
private destroyRef = inject(DestroyRef);
data = signal<any>(null);
constructor() {
this.dataService.getData()
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(data => this.data.set(data));
}
}
Common Operators
switchMap - Switch to New Observable
// Switch to new search on every query change
searchResults$ = this.searchQuery$.pipe(
debounceTime(300),
switchMap(query => this.http.get(`/api/search?q=${query}`))
);
mergeMap - Merge Multiple Observables
// Process all tasks in parallel
processTasks$ = this.tasks$.pipe(
mergeMap(tasks =>
from(tasks).pipe(
mergeMap(task => this.processTask(task))
)
)
);
concatMap - Process Sequentially
// Process tasks one by one in order
processTasks$ = this.tasks$.pipe(
concatMap(tasks =>
from(tasks).pipe(
concatMap(task => this.processTask(task))
)
)
);
debounceTime - Debounce Input
// Wait 300ms after user stops typing
search$ = this.searchInput$.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => this.searchService.search(query))
);
distinctUntilChanged - Skip Duplicates
// Only emit when value actually changes
status$ = this.statusSubject$.pipe(
distinctUntilChanged()
);
filter - Filter Values
// Only emit non-empty strings
nonEmptySearch$ = this.searchQuery$.pipe(
filter(query => query.trim().length > 0),
switchMap(query => this.search(query))
);
map - Transform Values
// Transform task to display format
taskDisplay$ = this.task$.pipe(
map(task => ({
title: task.title,
status: task.status.toUpperCase(),
dueDate: formatDate(task.dueDate)
}))
);
tap - Side Effects
// Log without transforming
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
tap(tasks => console.log('Loaded tasks:', tasks.length)),
tap(tasks => this.analyticsService.track('tasks_loaded'))
);
Combining Observables
combineLatest - Wait for All
import { combineLatest } from 'rxjs';
// Combine multiple observables
viewModel$ = combineLatest([
this.tasks$,
this.users$,
this.settings$
]).pipe(
map(([tasks, users, settings]) => ({
tasks,
users,
settings
}))
);
// Convert to Signal
viewModel = toSignal(this.viewModel$);
forkJoin - Wait for All to Complete
import { forkJoin } from 'rxjs';
// Load multiple resources in parallel
loadAll$ = forkJoin({
tasks: this.taskService.getTasks(),
users: this.userService.getUsers(),
projects: this.projectService.getProjects()
}).pipe(
map(({ tasks, users, projects }) => ({
tasks,
users,
projects
}))
);
merge - Merge Multiple Streams
import { merge } from 'rxjs';
// Combine multiple event streams
allEvents$ = merge(
this.createEvent$,
this.updateEvent$,
this.deleteEvent$
).pipe(
tap(event => this.handleEvent(event))
);
zip - Pair Up Values
import { zip } from 'rxjs';
// Pair up matching values from two streams
paired$ = zip(
this.stream1$,
this.stream2$
).pipe(
map(([value1, value2]) => ({ value1, value2 }))
);
Error Handling
catchError - Handle Errors
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
catchError(error => {
console.error('Failed to load tasks:', error);
this.notificationService.error('Failed to load tasks');
return of([]); // Return empty array as fallback
})
);
retry - Retry on Failure
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
retry(3), // Retry up to 3 times
catchError(error => {
console.error('Failed after 3 retries:', error);
return of([]);
})
);
retryWhen - Conditional Retry with Backoff
import { retryWhen, delay, scan, throwError } from 'rxjs';
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
retryWhen(errors =>
errors.pipe(
scan((retryCount, error) => {
if (retryCount >= 3) {
throw error; // Max retries reached
}
console.log(`Retry ${retryCount + 1}/3`);
return retryCount + 1;
}, 0),
delay(1000) // Wait 1 second between retries
)
),
catchError(error => {
console.error('Failed after retries:', error);
return of([]);
})
);
Real-Time Data
interval - Periodic Updates
import { interval, switchMap } from 'rxjs';
// Poll every 30 seconds
liveData$ = interval(30000).pipe(
startWith(0), // Emit immediately
switchMap(() => this.http.get('/api/live-data')),
takeUntilDestroyed(this.destroyRef)
);
liveData = toSignal(this.liveData$);
WebSocket Pattern
import { webSocket } from 'rxjs/webSocket';
export class RealtimeService {
private socket$ = webSocket('wss://api.example.com/ws');
messages$ = this.socket$.pipe(
catchError(error => {
console.error('WebSocket error:', error);
return EMPTY;
}),
retry({ delay: 5000 }) // Reconnect after 5 seconds
);
sendMessage(msg: any): void {
this.socket$.next(msg);
}
}
Loading States
Share Loading State
import { shareReplay } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class TaskService {
private http = inject(HttpClient);
// Cache and share the result
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
shareReplay({ bufferSize: 1, refCount: true })
);
}
Loading Indicator Pattern
@Component({
selector: 'app-task-list',
template: `
@if (loading()) {
<nz-spin />
} @else if (error()) {
<nz-alert nzType="error" [nzMessage]="error()!" />
} @else {
@for (task of tasks(); track task.id) {
<div>{{ task.title }}</div>
}
}
`
})
export class TaskListComponent {
private taskService = inject(TaskService);
private destroyRef = inject(DestroyRef);
loading = signal(false);
error = signal<string | null>(null);
tasks = signal<Task[]>([]);
constructor() {
this.loadTasks();
}
loadTasks(): void {
this.loading.set(true);
this.error.set(null);
this.taskService.tasks$
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe({
next: (tasks) => {
this.tasks.set(tasks);
this.loading.set(false);
},
error: (err) => {
this.error.set(err.message || 'Failed to load tasks');
this.loading.set(false);
}
});
}
}
Advanced Patterns
Throttle vs Debounce
import { throttleTime, debounceTime } from 'rxjs';
// Throttle: Emit first, then ignore for duration
throttled$ = this.clicks$.pipe(
throttleTime(1000) // Max once per second
);
// Debounce: Wait for quiet period
debounced$ = this.input$.pipe(
debounceTime(300) // Wait 300ms after last input
);
Scan - Accumulate Values
// Running total
total$ = this.amounts$.pipe(
scan((acc, value) => acc + value, 0)
);
// History accumulation
history$ = this.events$.pipe(
scan((history, event) => [...history, event], [] as Event[])
);
startWith - Initial Value
// Start with loading state
status$ = this.dataLoad$.pipe(
map(() => 'loaded'),
startWith('loading')
);
pairwise - Previous + Current
// Compare with previous value
changes$ = this.value$.pipe(
pairwise(),
map(([prev, curr]) => ({
previous: prev,
current: curr,
diff: curr - prev
}))
);
Best Practices
✅ DO
// Use toSignal() for reactive data in templates
data = toSignal(this.data$, { initialValue: [] });
// Use takeUntilDestroyed() for cleanup
this.data$.pipe(takeUntilDestroyed(this.destroyRef)).subscribe();
// Use switchMap for user-triggered requests
search$ = this.query$.pipe(switchMap(q => this.search(q)));
// Handle errors explicitly
data$ = this.http.get('/api/data').pipe(
catchError(err => of(null))
);
❌ DON'T
// Don't forget to unsubscribe
this.data$.subscribe(); // Memory leak!
// Don't use nested subscribes
this.data$.subscribe(data => {
this.process(data).subscribe(); // Anti-pattern!
});
// Don't use async pipe with signals
@if (data$ | async) { } // Use signals instead
Checklist
When using RxJS:
- [ ] Use toSignal() to convert Observables to Signals
- [ ] Use takeUntilDestroyed() for subscription cleanup
- [ ] Handle errors with catchError()
- [ ] Debounce user input (300ms)
- [ ] Use switchMap for cancellable requests
- [ ] Share expensive Observables with shareReplay()
- [ ] Provide initial values with startWith()
- [ ] Filter out empty/null values
- [ ] Test async operations
- [ ] Document complex operator chains