Reactive Streams
Reactive Streams は「非同期データストリーム」を宣言的に処理するパターン。RxJS の Observable、バックプレッシャー、イベント駆動アーキテクチャの基盤を理解する。
84 分で読めます41,807 文字
Reactive Streams
Reactive Streams は「非同期データストリーム」を宣言的に処理するパターン。RxJS の Observable、バックプレッシャー、イベント駆動アーキテクチャの基盤を理解する。
この章で学ぶこと
- Observable パターンの仕組みを理解する
- RxJS のオペレータとパイプラインを把握する
- バックプレッシャーの概念を学ぶ
- Subject の種類と使い分けを理解する
- Angular・React でのリアクティブパターンを習得する
- テスト・デバッグの手法を身につける
前提知識
このガイドを読む前に、以下の知識があると理解が深まります:
- 基本的なプログラミングの知識
- 関連する基礎概念の理解
- async/await の内容を理解していること
1. Promise vs Observable
Promise:
→ 単一の値(1回だけ解決)
→ 作成時に実行開始(eager)
→ キャンセル不可
Observable:
→ 複数の値(時間をかけて流れるストリーム)
→ 購読時に実行開始(lazy)
→ キャンセル可能(unsubscribe)
Promise: ──────● (1つの値)
Observable: ──●──●──●──●──│ (複数の値、完了あり)
──●──●──✗ (エラーで終了)
用途:
Promise: API呼び出し、DBクエリ(1回の結果)
Observable: WebSocket、ユーザー入力、タイマー(連続的なイベント)
1.1 Observable のライフサイクル
Observable のライフサイクル:
作成 (Creation)
│
▼
購読 (Subscription) ← subscribe() の呼び出し
│
▼
値の発行 (Emission)
│ next(value) ──→ Observer の next コールバック
│ next(value) ──→ Observer の next コールバック
│ ...
│
├── complete() ──→ Observer の complete コールバック ──→ 終了
│
└── error(err) ──→ Observer の error コールバック ──→ 終了
購読解除 (Unsubscription) ← unsubscribe() の呼び出し
│
▼
リソース解放 (Teardown) ← teardown ロジックの実行
1.2 Hot Observable vs Cold Observable
Cold Observable:
→ 購読するたびに新しいデータストリームが作成される
→ 各購読者が独立したストリームを受け取る
→ 例: HTTPリクエスト、ファイル読み取り
Subscriber A: ──1──2──3──4──│
Subscriber B: ──1──2──3──4──│ (独立したストリーム)
Hot Observable:
→ データソースが購読に関係なく値を発行し続ける
→ 途中から購読すると、過去の値は受け取れない
→ 例: WebSocket、マウスイベント、株価フィード
Source: ──1──2──3──4──5──6──│
Subscriber A: ──1──2──3──4──5──6──│ (最初から購読)
Subscriber B: ──3──4──5──6──│ (途中から購読)
// Cold Observable の例
const cold$ = new Observable(subscriber => {
// 購読するたびに新しいランダム値
subscriber.next(Math.random());
subscriber.complete();
});
cold$.subscribe(v => console.log('A:', v)); // A: 0.123...
cold$.subscribe(v => console.log('B:', v)); // B: 0.456... (異なる値)
// Hot Observable の例(Subject を使用)
const hot$ = new Subject<number>();
hot$.subscribe(v => console.log('A:', v));
hot$.next(1); // A: 1
hot$.next(2); // A: 2
hot$.subscribe(v => console.log('B:', v));
hot$.next(3); // A: 3, B: 3 (Bは3からしか受け取れない)2. RxJS の基本
import { Observable, of, from, interval, fromEvent } from 'rxjs';
import { map, filter, take, debounceTime, switchMap } from 'rxjs/operators';
// Observable の作成
const numbers$ = of(1, 2, 3, 4, 5);
const array$ = from([10, 20, 30]);
const timer$ = interval(1000); // 1秒ごとに 0, 1, 2, ...
// パイプライン(オペレータチェーン)
numbers$.pipe(
filter(n => n % 2 === 0), // 偶数のみ
map(n => n * 10), // 10倍
).subscribe(value => console.log(value)); // 20, 40
// 検索ボックスの実践例
const searchInput = document.getElementById('search');
fromEvent(searchInput, 'input').pipe(
debounceTime(300), // 300ms 入力停止を待つ
map(event => (event.target as HTMLInputElement).value),
filter(query => query.length >= 2), // 2文字以上
switchMap(query => fetch(`/api/search?q=${query}`).then(r => r.json())),
// switchMap: 新しい値が来たら前のリクエストをキャンセル
).subscribe(results => {
renderSearchResults(results);
});2.1 Observable の作成方法
import {
Observable, of, from, interval, timer, fromEvent,
defer, range, EMPTY, NEVER, throwError,
generate, iif
} from 'rxjs';
import { ajax } from 'rxjs/ajax';
// 1. カスタム Observable
const custom$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
// teardown ロジック(購読解除時に実行)
return () => {
console.log('クリーンアップ処理');
};
});
// 2. 静的生成関数
const values$ = of('a', 'b', 'c'); // 同期的に3つの値を発行
const arr$ = from([1, 2, 3]); // 配列からObservable
const promise$ = from(fetch('/api/data')); // PromiseからObservable
const iter$ = from(new Map([['a', 1], ['b', 2]])); // IterableからObservable
// 3. タイマー系
const interval$ = interval(1000); // 0, 1, 2, ... (1秒間隔)
const timerOnce$ = timer(3000); // 3秒後に0を発行
const timerRepeat$ = timer(0, 1000); // 即座に開始、1秒間隔
// 4. イベント系
const clicks$ = fromEvent(document, 'click');
const resize$ = fromEvent(window, 'resize');
const keydown$ = fromEvent<KeyboardEvent>(document, 'keydown');
// 5. AJAX
const data$ = ajax.getJSON('/api/users');
const post$ = ajax({
url: '/api/users',
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: { name: 'Tanaka', email: 'tanaka@example.com' },
});
// 6. 条件分岐
const source$ = iif(
() => Math.random() > 0.5,
of('heads'),
of('tails'),
);
// 7. 遅延生成(購読時に初めてObservableを作成)
const deferred$ = defer(() => {
const timestamp = Date.now();
return of(timestamp);
});
// 8. ジェネレータ風
const fib$ = generate(
[0, 1], // 初期値
([a, b]) => a < 100, // 条件
([a, b]) => [b, a + b] as [number, number], // 更新
([a, b]) => a, // 結果の選択
);
// → 0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 892.2 Observer パターンの詳細
// Observer インターフェース
interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
// 完全な Observer を渡す
const subscription = numbers$.subscribe({
next: value => console.log('値:', value),
error: err => console.error('エラー:', err),
complete: () => console.log('完了'),
});
// 部分的な Observer(省略可能)
numbers$.subscribe(
value => console.log(value), // next のみ
);
numbers$.subscribe({
next: value => console.log(value),
error: err => console.error(err), // complete 省略
});
// Subscription の管理
const sub = interval(1000).subscribe(v => console.log(v));
// 5秒後に購読解除
setTimeout(() => {
sub.unsubscribe(); // リソース解放
console.log('購読解除しました');
}, 5000);
// 複数の Subscription をまとめて管理
import { Subscription } from 'rxjs';
const parentSub = new Subscription();
parentSub.add(interval(1000).subscribe(v => console.log('A:', v)));
parentSub.add(interval(2000).subscribe(v => console.log('B:', v)));
parentSub.add(interval(3000).subscribe(v => console.log('C:', v)));
// まとめて購読解除
setTimeout(() => parentSub.unsubscribe(), 10000);3. 主要オペレータ
変換:
map — 値を変換
switchMap — 新しい Observable に切り替え(前をキャンセル)
mergeMap — 並行して Observable を実行
concatMap — 直列に Observable を実行
exhaustMap — 現在の Observable が完了するまで新しいものを無視
scan — 累積値を計算(reduce のストリーム版)
pluck — オブジェクトの特定プロパティを抽出
pairwise — 直前の値と現在の値をペアにする
フィルタリング:
filter — 条件に合う値のみ通す
take — 最初のN個だけ取得
takeUntil — 別のObservableが発行するまで取得
takeWhile — 条件がtrueの間取得
skip — 最初のN個をスキップ
debounceTime — 一定時間入力がなかったら通す
throttleTime — 一定時間に1つだけ通す
distinctUntilChanged — 値が変わった時だけ通す
first — 最初の値(条件付き可)
last — 最後の値(条件付き可)
elementAt — N番目の値
結合:
merge — 複数の Observable を合流
combineLatest — 各 Observable の最新値を組み合わせ
zip — 各 Observable の値を1対1で組み合わせ
forkJoin — 全ての Observable の最後の値を取得(Promise.allに近い)
concat — 直列に結合(前が完了してから次)
race — 最も早く値を発行した Observable を採用
withLatestFrom — メインストリームの発行時に他の最新値を付加
エラー:
catchError — エラーをハンドリングして回復
retry — エラー時にリトライ
retryWhen — 条件付きリトライ(RxJS 7 で非推奨、retry に統合)
ユーティリティ:
tap — 副作用(デバッグ、ログ)
delay — 値の発行を遅延
timeout — 一定時間値が来なければエラー
finalize — 完了/エラー/購読解除時のクリーンアップ
share — Cold を Hot に変換(マルチキャスト)
shareReplay — 最新N個の値をリプレイ
3.1 変換オペレータの詳細
import {
of, from, interval, fromEvent, timer
} from 'rxjs';
import {
map, switchMap, mergeMap, concatMap, exhaustMap,
scan, pairwise, bufferTime, groupBy, toArray
} from 'rxjs/operators';
// === switchMap: 最新のリクエストのみ(検索、オートコンプリート) ===
const search$ = fromEvent<Event>(searchInput, 'input').pipe(
debounceTime(300),
map(e => (e.target as HTMLInputElement).value),
switchMap(query =>
// 新しい入力が来ると前のリクエストをキャンセル
fetch(`/api/search?q=${query}`).then(r => r.json())
),
);
// === mergeMap: 並行実行(メール一括送信、ファイル並行ダウンロード) ===
const sendEmails$ = from(emailList).pipe(
mergeMap(
email => sendEmail(email),
5, // 最大並行数を5に制限
),
);
// === concatMap: 直列実行(順序保証が必要な場合) ===
const uploadFiles$ = from(files).pipe(
concatMap(file =>
// 1つずつ順番にアップロード(前が完了してから次)
uploadFile(file)
),
);
// === exhaustMap: 二重送信防止(フォーム送信ボタン) ===
const submitForm$ = fromEvent(submitBtn, 'click').pipe(
exhaustMap(() =>
// 前のリクエストが進行中なら新しいクリックを無視
fetch('/api/submit', { method: 'POST', body: formData })
.then(r => r.json())
),
);
// === scan: 累積計算(状態管理) ===
const actions$ = new Subject<{ type: string; payload: any }>();
const state$ = actions$.pipe(
scan((state, action) => {
switch (action.type) {
case 'INCREMENT':
return { ...state, count: state.count + 1 };
case 'DECREMENT':
return { ...state, count: state.count - 1 };
case 'SET_NAME':
return { ...state, name: action.payload };
default:
return state;
}
}, { count: 0, name: '' }),
);
// === pairwise: 直前の値と比較 ===
const scrollPosition$ = fromEvent(window, 'scroll').pipe(
map(() => window.scrollY),
pairwise(),
map(([prev, curr]) => ({
direction: curr > prev ? 'down' : 'up',
delta: Math.abs(curr - prev),
})),
);
// === bufferTime: 一定時間ごとにバッチ処理 ===
const events$ = fromEvent(document, 'mousemove').pipe(
bufferTime(1000), // 1秒ごとにイベントの配列として発行
filter(events => events.length > 0),
map(events => ({
count: events.length,
avgX: events.reduce((sum, e: any) => sum + e.clientX, 0) / events.length,
})),
);
// === groupBy: ストリームをグループ分け ===
interface LogEntry {
level: 'info' | 'warn' | 'error';
message: string;
}
const logs$ = from<LogEntry[]>([
{ level: 'info', message: 'Started' },
{ level: 'error', message: 'Failed' },
{ level: 'info', message: 'Processing' },
{ level: 'warn', message: 'Slow query' },
{ level: 'error', message: 'Timeout' },
]);
logs$.pipe(
groupBy(log => log.level),
mergeMap(group$ =>
group$.pipe(
toArray(),
map(entries => ({ level: group$.key, entries })),
)
),
).subscribe(group => {
console.log(`${group.level}: ${group.entries.length} entries`);
});3.2 flattening オペレータの比較
switchMap vs mergeMap vs concatMap vs exhaustMap:
入力: ──A─────B─────C──│
switchMap(最新のみ):
A: ──a1──a2──(キャンセル)
B: ──b1──b2──(キャンセル)
C: ──c1──c2──c3──│
出力: ──a1──a2──b1──b2──c1──c2──c3──│
mergeMap(並行):
A: ──a1──a2──a3──│
B: ──b1──b2──b3──│
C: ──c1──c2──c3──│
出力: ──a1──a2──b1──a3──b2──c1──b3──c2──c3──│
concatMap(直列):
A: ──a1──a2──a3──│
B: ──b1──b2──b3──│
C: ──c1──c2──c3──│
出力: ──a1──a2──a3──b1──b2──b3──c1──c2──c3──│
exhaustMap(進行中は無視):
A: ──a1──a2──a3──│
B: (無視)
C: ──c1──c2──c3──│
出力: ──a1──a2──a3──c1──c2──c3──│
使い分け:
switchMap → 検索、オートコンプリート(最新のみ必要)
mergeMap → 並行ダウンロード(全結果必要、順序不問)
concatMap → ファイル順序処理(順序保証必要)
exhaustMap → フォーム送信(二重送信防止)
3.3 結合オペレータの詳細
import {
merge, combineLatest, zip, forkJoin, concat, race, withLatestFrom
} from 'rxjs';
// === merge: 複数ストリームの合流 ===
const keyboard$ = fromEvent(document, 'keydown');
const mouse$ = fromEvent(document, 'click');
const touch$ = fromEvent(document, 'touchstart');
const userActivity$ = merge(keyboard$, mouse$, touch$).pipe(
throttleTime(1000),
tap(() => resetIdleTimer()),
);
// === combineLatest: 各ストリームの最新値を組み合わせ ===
const selectedCategory$ = new BehaviorSubject<string>('all');
const searchQuery$ = new BehaviorSubject<string>('');
const sortOrder$ = new BehaviorSubject<string>('newest');
const filteredProducts$ = combineLatest([
selectedCategory$,
searchQuery$,
sortOrder$,
]).pipe(
debounceTime(100),
switchMap(([category, query, sort]) =>
fetch(`/api/products?category=${category}&q=${query}&sort=${sort}`)
.then(r => r.json())
),
);
// === zip: 1対1で組み合わせ ===
const names$ = of('Alice', 'Bob', 'Charlie');
const ages$ = of(25, 30, 35);
const cities$ = of('Tokyo', 'Osaka', 'Kyoto');
zip(names$, ages$, cities$).pipe(
map(([name, age, city]) => ({ name, age, city })),
).subscribe(person => console.log(person));
// { name: 'Alice', age: 25, city: 'Tokyo' }
// { name: 'Bob', age: 30, city: 'Osaka' }
// { name: 'Charlie', age: 35, city: 'Kyoto' }
// === forkJoin: 全完了後に最後の値(Promise.all 相当) ===
const dashboardData$ = forkJoin({
users: ajax.getJSON('/api/users'),
orders: ajax.getJSON('/api/orders'),
stats: ajax.getJSON('/api/stats'),
notifications: ajax.getJSON('/api/notifications'),
}).pipe(
catchError(err => {
console.error('ダッシュボードデータ取得失敗:', err);
return of({ users: [], orders: [], stats: null, notifications: [] });
}),
);
// === race: 最速のストリームを採用 ===
const primary$ = ajax.getJSON('https://primary-api.com/data');
const fallback$ = ajax.getJSON('https://fallback-api.com/data');
const data$ = race(primary$, fallback$); // 先に応答したほうを使用
// === withLatestFrom: メインストリーム発行時に他の最新値を付加 ===
const saveButton$ = fromEvent(saveBtn, 'click');
const formValue$ = new BehaviorSubject(getFormValues());
saveButton$.pipe(
withLatestFrom(formValue$),
switchMap(([_, formData]) =>
fetch('/api/save', { method: 'POST', body: JSON.stringify(formData) })
),
).subscribe();4. Subject の種類
Subject の種類と特徴:
Subject (基本):
購読前の値は受け取れない
A subscribes → next(1) → next(2) → B subscribes → next(3)
A: 1, 2, 3
B: 3
BehaviorSubject (最新値を保持):
購読時に最新値を即座に受け取る。初期値が必要
A subscribes(初期値0) → next(1) → next(2) → B subscribes → next(3)
A: 0, 1, 2, 3
B: 2, 3
ReplaySubject (N個の過去値をリプレイ):
指定した数だけ過去の値をバッファして新しい購読者に送る
A subscribes → next(1) → next(2) → next(3) → B subscribes(replay=2)
A: 1, 2, 3
B: 2, 3 (最新2個がリプレイされる)
AsyncSubject (最後の値のみ):
complete() 時に最後の値だけを発行
next(1) → next(2) → next(3) → complete()
A: 3
B: 3 (complete 後に購読しても最後の値を受け取る)
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';
// === BehaviorSubject: 現在の状態管理に最適 ===
interface AppState {
user: User | null;
theme: 'light' | 'dark';
language: string;
}
class StateService {
private state$ = new BehaviorSubject<AppState>({
user: null,
theme: 'light',
language: 'ja',
});
// 現在の状態を取得(同期的)
get currentState(): AppState {
return this.state$.getValue();
}
// 状態のストリームを取得
select<K extends keyof AppState>(key: K): Observable<AppState[K]> {
return this.state$.pipe(
map(state => state[key]),
distinctUntilChanged(),
);
}
// 状態を更新
update(partial: Partial<AppState>): void {
this.state$.next({
...this.currentState,
...partial,
});
}
}
const stateService = new StateService();
// テーマの変更を監視
stateService.select('theme').subscribe(theme => {
document.body.className = `theme-${theme}`;
});
// ユーザー情報の変更を監視
stateService.select('user').subscribe(user => {
if (user) {
console.log(`Welcome, ${user.name}`);
}
});
// === ReplaySubject: イベントの履歴を保持 ===
class EventBus {
private events$ = new ReplaySubject<AppEvent>(10); // 最新10件を保持
emit(event: AppEvent): void {
this.events$.next(event);
}
on(type: string): Observable<AppEvent> {
return this.events$.pipe(
filter(event => event.type === type),
);
}
// 過去のイベントも含めて取得
history(): Observable<AppEvent> {
return this.events$.asObservable();
}
}
// === AsyncSubject: 完了時の最終結果 ===
class ConfigLoader {
private config$ = new AsyncSubject<Config>();
async load(): Promise<void> {
try {
const config = await fetch('/api/config').then(r => r.json());
this.config$.next(config);
this.config$.complete(); // complete() で最後の値が発行される
} catch (err) {
this.config$.error(err);
}
}
getConfig(): Observable<Config> {
return this.config$.asObservable();
}
}5. 実践例: リアルタイムダッシュボード
import { combineLatest, timer, Subject, BehaviorSubject } from 'rxjs';
import {
switchMap, catchError, retry, map, share,
distinctUntilChanged, tap, takeUntil, startWith, scan
} from 'rxjs/operators';
// === ダッシュボードサービス ===
class DashboardService {
private destroy$ = new Subject<void>();
private refreshTrigger$ = new BehaviorSubject<void>(undefined);
// 統計情報(5秒ごと + 手動リフレッシュ)
readonly stats$ = this.refreshTrigger$.pipe(
switchMap(() => timer(0, 5000)),
switchMap(() =>
fetch('/api/stats')
.then(r => r.json())
.catch(() => ({ error: true }))
),
retry({ count: 3, delay: 2000 }),
catchError(() => of({ error: true, data: null })),
share(), // マルチキャスト(複数の購読者で共有)
takeUntil(this.destroy$),
);
// アラート(10秒ごと)
readonly alerts$ = timer(0, 10000).pipe(
switchMap(() =>
fetch('/api/alerts')
.then(r => r.json())
.catch(() => [])
),
catchError(() => of([])),
share(),
takeUntil(this.destroy$),
);
// WebSocket でリアルタイム更新
readonly liveEvents$ = new Observable<ServerEvent>(subscriber => {
const ws = new WebSocket('wss://api.example.com/events');
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
subscriber.next(data);
} catch (err) {
console.error('WebSocket parse error:', err);
}
};
ws.onerror = (err) => subscriber.error(err);
ws.onclose = () => subscriber.complete();
return () => ws.close();
}).pipe(
retry({ count: 5, delay: (error, retryCount) => timer(Math.min(1000 * Math.pow(2, retryCount), 30000)) }),
share(),
takeUntil(this.destroy$),
);
// ダッシュボードの統合状態
readonly dashboardState$ = combineLatest([
this.stats$.pipe(startWith(null)),
this.alerts$.pipe(startWith([])),
this.liveEvents$.pipe(
scan((events: ServerEvent[], event) => [...events.slice(-50), event], []),
startWith([]),
),
]).pipe(
map(([stats, alerts, events]) => ({
stats,
alerts,
events,
lastUpdated: new Date(),
})),
distinctUntilChanged((prev, curr) =>
JSON.stringify(prev) === JSON.stringify(curr)
),
);
refresh(): void {
this.refreshTrigger$.next();
}
destroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
}6. バックプレッシャー
バックプレッシャー(Backpressure):
→ 生産者(Producer)が消費者(Consumer)より速い場合の制御
生産者: ──●●●●●●●●●●──→ 速い
消費者: ──●───●───●───→ 遅い
→ メモリ溢れ / 遅延蓄積
対策:
1. バッファリング: 一時的にキューに溜める(メモリ限界あり)
2. ドロップ: 古い値を捨てる(最新のみ保持)
3. サンプリング: 一定間隔で最新値を取得
4. スロットリング: 一定時間に1つだけ通す
5. ウィンドウイング: 時間やカウントでグループ化
RxJS でのバックプレッシャー:
→ bufferTime: 時間ごとにバッチ処理
→ bufferCount: 個数ごとにバッチ処理
→ throttleTime: 一定間隔で値を通す
→ sampleTime: 一定間隔で最新値を取得
→ auditTime: 値が来てから一定時間後に最新値を通す
→ debounceTime: 値が来てから一定時間待ってから最新値を通す
→ window/windowTime: 時間窓でグループ化
6.1 バックプレッシャーの実践例
import {
fromEvent, interval, Subject
} from 'rxjs';
import {
bufferTime, bufferCount, throttleTime, sampleTime,
auditTime, debounceTime, windowTime, mergeAll,
tap, filter, map, scan
} from 'rxjs/operators';
// === マウス移動の間引き ===
const mouseMove$ = fromEvent<MouseEvent>(document, 'mousemove');
// throttleTime: 最初の値を通し、指定時間は無視
mouseMove$.pipe(
throttleTime(16), // ~60fps
map(e => ({ x: e.clientX, y: e.clientY })),
).subscribe(pos => updateCursor(pos));
// sampleTime: 一定間隔で最新値を取得
mouseMove$.pipe(
sampleTime(100), // 100msごとに最新の位置
map(e => ({ x: e.clientX, y: e.clientY })),
).subscribe(pos => sendAnalytics(pos));
// auditTime: 値が来てから一定時間後に最新値を通す
mouseMove$.pipe(
auditTime(200),
).subscribe(e => updateTooltip(e));
// === ログの一括送信 ===
const logStream$ = new Subject<LogEntry>();
// 100件ごと、または5秒ごとにバッチ送信
logStream$.pipe(
bufferTime(5000, undefined, 100), // 5秒 or 100件
filter(batch => batch.length > 0),
).subscribe(async batch => {
await fetch('/api/logs', {
method: 'POST',
body: JSON.stringify(batch),
});
});
// === ウィンドウでのメトリクス集計 ===
const requestStream$ = new Subject<{ endpoint: string; duration: number }>();
requestStream$.pipe(
windowTime(60000), // 1分間のウィンドウ
mergeAll(),
scan((acc, req) => ({
count: acc.count + 1,
totalDuration: acc.totalDuration + req.duration,
maxDuration: Math.max(acc.maxDuration, req.duration),
}), { count: 0, totalDuration: 0, maxDuration: 0 }),
).subscribe(metrics => {
console.log(`Requests/min: ${metrics.count}`);
console.log(`Avg duration: ${metrics.totalDuration / metrics.count}ms`);
console.log(`Max duration: ${metrics.maxDuration}ms`);
});
// === debounceTime vs throttleTime の違い ===
//
// debounceTime(300):
// 入力: ──a─b─c───────d─e──────│
// 出力: ──────────c─────────e──│
// → 入力が落ち着いてから発行(検索に最適)
//
// throttleTime(300):
// 入力: ──a─b─c───────d─e──────│
// 出力: ──a───────────d────────│
// → 最初の値をすぐ通し、指定時間待つ(スクロールに最適)
//
// auditTime(300):
// 入力: ──a─b─c───────d─e──────│
// 出力: ──────c───────────e────│
// → 値が来てから指定時間後に最新値を通す
//
// sampleTime(300):
// 入力: ──a─b─c───────d─e──────│
// 出力: ──b─────c──────e───────│
// → 一定間隔で最新値をサンプリング7. エラーハンドリング
import { of, throwError, timer, EMPTY, Observable } from 'rxjs';
import {
catchError, retry, retryWhen, delay, take,
tap, finalize, timeout, switchMap
} from 'rxjs/operators';
// === 基本的なエラーハンドリング ===
const data$ = ajax.getJSON('/api/data').pipe(
catchError(err => {
console.error('API error:', err);
return of({ fallback: true, data: [] }); // フォールバック値
}),
);
// === リトライ戦略 ===
// 単純なリトライ
const withRetry$ = ajax.getJSON('/api/unstable').pipe(
retry(3), // 3回リトライ(合計4回試行)
catchError(err => {
console.error('全リトライ失敗:', err);
return EMPTY;
}),
);
// 指数バックオフ付きリトライ(RxJS 7+)
const withBackoff$ = ajax.getJSON('/api/unstable').pipe(
retry({
count: 5,
delay: (error, retryCount) => {
const delayMs = Math.min(1000 * Math.pow(2, retryCount - 1), 30000);
console.log(`リトライ ${retryCount}: ${delayMs}ms後`);
return timer(delayMs);
},
resetOnSuccess: true,
}),
catchError(err => {
notifyUser('サービスに接続できません');
return EMPTY;
}),
);
// === 条件付きリトライ ===
const smartRetry$ = ajax('/api/data').pipe(
retry({
count: 3,
delay: (error, retryCount) => {
// 4xx エラーはリトライしない
if (error.status >= 400 && error.status < 500) {
return throwError(() => error);
}
// 5xx エラーのみリトライ
return timer(1000 * retryCount);
},
}),
);
// === タイムアウト ===
const withTimeout$ = ajax.getJSON('/api/slow-endpoint').pipe(
timeout({
each: 5000, // 各値の発行間隔が5秒を超えたらエラー
with: () => throwError(() => new Error('Request timeout')),
}),
catchError(err => {
if (err.message === 'Request timeout') {
return of({ timeout: true });
}
return throwError(() => err);
}),
);
// === finalize: クリーンアップ ===
function loadData(): Observable<Data> {
showLoadingSpinner();
return ajax.getJSON<Data>('/api/data').pipe(
retry(2),
catchError(err => {
showErrorNotification(err.message);
return EMPTY;
}),
finalize(() => {
hideLoadingSpinner(); // 成功/失敗/購読解除に関わらず実行
}),
);
}
// === エラーの分類と処理 ===
class ApiService {
request<T>(url: string): Observable<T> {
return ajax.getJSON<T>(url).pipe(
catchError(err => {
switch (err.status) {
case 401:
this.authService.logout();
return throwError(() => new UnauthorizedError());
case 403:
return throwError(() => new ForbiddenError());
case 404:
return throwError(() => new NotFoundError(url));
case 429:
// レート制限: Retry-After ヘッダーを尊重
const retryAfter = parseInt(err.response?.headers?.get('Retry-After') || '5');
return timer(retryAfter * 1000).pipe(
switchMap(() => this.request<T>(url)),
);
default:
return throwError(() => new ApiError(err.message, err.status));
}
}),
);
}
}8. Angular でのリアクティブパターン
// === Angular コンポーネントでの使用 ===
@Component({
selector: 'app-user-list',
template: `
<input [formControl]="searchControl" placeholder="検索...">
<div *ngIf="loading$ | async" class="spinner">読み込み中...</div>
<ul>
<li *ngFor="let user of users$ | async; trackBy: trackById">
{{ user.name }} - {{ user.email }}
</li>
</ul>
<div *ngIf="error$ | async as error" class="error">
{{ error.message }}
</div>
`,
})
export class UserListComponent implements OnInit, OnDestroy {
private destroy$ = new Subject<void>();
searchControl = new FormControl('');
loading$ = new BehaviorSubject<boolean>(false);
error$ = new BehaviorSubject<Error | null>(null);
users$: Observable<User[]>;
constructor(private userService: UserService) {}
ngOnInit(): void {
this.users$ = this.searchControl.valueChanges.pipe(
startWith(''),
debounceTime(300),
distinctUntilChanged(),
tap(() => {
this.loading$.next(true);
this.error$.next(null);
}),
switchMap(query =>
this.userService.searchUsers(query).pipe(
catchError(err => {
this.error$.next(err);
return of([]);
}),
finalize(() => this.loading$.next(false)),
)
),
takeUntil(this.destroy$),
);
}
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
trackById(index: number, user: User): number {
return user.id;
}
}
// === Angular サービスでのキャッシュ ===
@Injectable({ providedIn: 'root' })
export class UserService {
private cache$ = new Map<string, Observable<User>>();
constructor(private http: HttpClient) {}
getUser(id: string): Observable<User> {
if (!this.cache$.has(id)) {
this.cache$.set(id,
this.http.get<User>(`/api/users/${id}`).pipe(
shareReplay({ bufferSize: 1, refCount: true }),
// refCount: true → 購読者が0になったらキャッシュを破棄
)
);
}
return this.cache$.get(id)!;
}
searchUsers(query: string): Observable<User[]> {
return this.http.get<User[]>(`/api/users`, {
params: { q: query },
});
}
// リアクティブなCRUD
private refresh$ = new Subject<void>();
users$ = this.refresh$.pipe(
startWith(undefined),
switchMap(() => this.http.get<User[]>('/api/users')),
shareReplay(1),
);
createUser(user: CreateUserDto): Observable<User> {
return this.http.post<User>('/api/users', user).pipe(
tap(() => this.refresh$.next()), // 作成後にリフレッシュ
);
}
}9. React でのリアクティブパターン
import { useEffect, useState, useRef, useMemo } from 'react';
import { Subject, BehaviorSubject, Observable, Subscription } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, catchError } from 'rxjs/operators';
// === カスタムフック: useObservable ===
function useObservable<T>(observable$: Observable<T>, initialValue: T): T {
const [value, setValue] = useState<T>(initialValue);
useEffect(() => {
const subscription = observable$.subscribe({
next: setValue,
error: err => console.error('Observable error:', err),
});
return () => subscription.unsubscribe();
}, [observable$]);
return value;
}
// === カスタムフック: useSubject ===
function useSubject<T>(): [Subject<T>, (value: T) => void] {
const subjectRef = useRef<Subject<T>>();
if (!subjectRef.current) {
subjectRef.current = new Subject<T>();
}
const emit = useMemo(
() => (value: T) => subjectRef.current!.next(value),
[],
);
useEffect(() => {
return () => subjectRef.current!.complete();
}, []);
return [subjectRef.current, emit];
}
// === 検索コンポーネント ===
function SearchComponent() {
const [results, setResults] = useState<SearchResult[]>([]);
const [loading, setLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
const searchSubject = useRef(new Subject<string>());
useEffect(() => {
const subscription = searchSubject.current.pipe(
debounceTime(300),
distinctUntilChanged(),
tap(() => {
setLoading(true);
setError(null);
}),
switchMap(query =>
from(fetch(`/api/search?q=${query}`).then(r => r.json())).pipe(
catchError(err => {
setError(err.message);
return of([]);
}),
)
),
tap(() => setLoading(false)),
).subscribe(setResults);
return () => subscription.unsubscribe();
}, []);
return (
<div>
<input
type="text"
onChange={e => searchSubject.current.next(e.target.value)}
placeholder="検索..."
/>
{loading && <div className="spinner">検索中...</div>}
{error && <div className="error">{error}</div>}
<ul>
{results.map(r => (
<li key={r.id}>{r.title}</li>
))}
</ul>
</div>
);
}
// === WebSocket フック ===
function useWebSocket<T>(url: string): {
messages$: Observable<T>;
send: (data: any) => void;
status: 'connecting' | 'connected' | 'disconnected';
} {
const [status, setStatus] = useState<'connecting' | 'connected' | 'disconnected'>('connecting');
const wsRef = useRef<WebSocket | null>(null);
const messages$ = useMemo(() => new Subject<T>(), []);
useEffect(() => {
const ws = new WebSocket(url);
wsRef.current = ws;
ws.onopen = () => setStatus('connected');
ws.onclose = () => setStatus('disconnected');
ws.onmessage = (event) => {
try {
messages$.next(JSON.parse(event.data));
} catch (err) {
console.error('Parse error:', err);
}
};
return () => {
ws.close();
messages$.complete();
};
}, [url]);
const send = (data: any) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify(data));
}
};
return { messages$: messages$.asObservable(), send, status };
}10. テストとデバッグ
import { TestScheduler } from 'rxjs/testing';
import { map, filter, delay, debounceTime, switchMap } from 'rxjs/operators';
// === Marble Testing ===
describe('RxJS オペレータのテスト', () => {
let scheduler: TestScheduler;
beforeEach(() => {
scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
});
// map オペレータのテスト
it('values を 10 倍にする', () => {
scheduler.run(({ cold, expectObservable }) => {
const source$ = cold(' -a-b-c-|', { a: 1, b: 2, c: 3 });
const expected = ' -a-b-c-|';
const result$ = source$.pipe(map(x => x * 10));
expectObservable(result$).toBe(expected, { a: 10, b: 20, c: 30 });
});
});
// filter オペレータのテスト
it('偶数のみ通す', () => {
scheduler.run(({ cold, expectObservable }) => {
const source$ = cold(' -a-b-c-d-|', { a: 1, b: 2, c: 3, d: 4 });
const expected = ' ---b---d-|';
const result$ = source$.pipe(filter(x => x % 2 === 0));
expectObservable(result$).toBe(expected, { b: 2, d: 4 });
});
});
// debounceTime のテスト
it('300ms のデバウンスが正しく機能する', () => {
scheduler.run(({ cold, expectObservable }) => {
const source$ = cold(' -a--b-----c--|');
const expected = ' ---- 300ms b 196ms c--|';
// bはaの後300ms経過で発行、cはbの後300ms以上経過で発行
const result$ = source$.pipe(debounceTime(300));
expectObservable(result$).toBe(expected);
});
});
// エラーのテスト
it('エラーをキャッチしてフォールバック値を返す', () => {
scheduler.run(({ cold, expectObservable }) => {
const source$ = cold(' -a-b-#', { a: 1, b: 2 }, new Error('fail'));
const expected = ' -a-b-(c|)';
const result$ = source$.pipe(
catchError(() => of(0)),
);
expectObservable(result$).toBe(expected, { a: 1, b: 2, c: 0 });
});
});
});
// === Marble Syntax 一覧 ===
// - : 1フレーム(10ms in virtual time)
// a : 値の発行(値はオブジェクトリテラルで定義)
// | : complete
// # : error
// ^ : subscribe ポイント(hot observableで使用)
// ! : unsubscribe ポイント
// () : 同期グループ(同じフレームで複数の値を発行)
// === tap でデバッグ ===
const debuggedStream$ = source$.pipe(
tap({
next: val => console.log('[DEBUG] next:', val),
error: err => console.error('[DEBUG] error:', err),
complete: () => console.log('[DEBUG] complete'),
subscribe: () => console.log('[DEBUG] subscribe'),
unsubscribe: () => console.log('[DEBUG] unsubscribe'),
finalize: () => console.log('[DEBUG] finalize'),
}),
map(transform),
tap(val => console.log('[DEBUG] after map:', val)),
);11. Reactive Streams Specification(Java/Kotlin)
Reactive Streams 仕様(JVM):
Publisher<T>
└── subscribe(Subscriber<T>)
Subscriber<T>
├── onSubscribe(Subscription)
├── onNext(T)
├── onError(Throwable)
└── onComplete()
Subscription
├── request(long n) ← バックプレッシャーの核心
└── cancel()
Processor<T, R>
└── Publisher<R> + Subscriber<T>
実装ライブラリ:
→ Project Reactor (Spring WebFlux)
→ RxJava 3
→ Akka Streams
→ Kotlin Coroutines Flow
// Kotlin Flow の例(Reactive Streams の軽量版)
import kotlinx.coroutines.flow.*
// Flow の作成
fun fibonacci(): Flow<Long> = flow {
var a = 0L
var b = 1L
while (true) {
emit(a)
val temp = a + b
a = b
b = temp
}
}
// 使用
suspend fun main() {
fibonacci()
.take(10)
.filter { it % 2 == 0L }
.map { it * it }
.collect { println(it) }
}
// StateFlow(BehaviorSubject 相当)
class UserViewModel : ViewModel() {
private val _state = MutableStateFlow(UserState())
val state: StateFlow<UserState> = _state.asStateFlow()
fun loadUser(id: String) {
viewModelScope.launch {
_state.update { it.copy(loading = true) }
try {
val user = userRepository.getUser(id)
_state.update { it.copy(user = user, loading = false) }
} catch (e: Exception) {
_state.update { it.copy(error = e.message, loading = false) }
}
}
}
}
// SharedFlow(Subject 相当)
class EventBus {
private val _events = MutableSharedFlow<AppEvent>(
replay = 0,
extraBufferCapacity = 64,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
val events: SharedFlow<AppEvent> = _events.asSharedFlow()
suspend fun emit(event: AppEvent) {
_events.emit(event)
}
}12. パフォーマンス最適化
// === share / shareReplay でマルチキャストを活用 ===
// NG: 各 subscribe が独立した HTTP リクエストを発行
const user$ = ajax.getJSON('/api/user/1');
user$.subscribe(u => updateHeader(u)); // リクエスト1
user$.subscribe(u => updateSidebar(u)); // リクエスト2(無駄)
// OK: shareReplay で結果を共有
const user$ = ajax.getJSON('/api/user/1').pipe(
shareReplay({ bufferSize: 1, refCount: true }),
);
user$.subscribe(u => updateHeader(u)); // リクエスト1
user$.subscribe(u => updateSidebar(u)); // キャッシュから取得(リクエストなし)
// === メモリリーク防止 ===
// NG: takeUntil なしで購読
class MyComponent {
ngOnInit() {
interval(1000).subscribe(v => this.update(v));
// コンポーネント破棄後もメモリリーク
}
}
// OK: takeUntil で自動解除
class MyComponent implements OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
interval(1000).pipe(
takeUntil(this.destroy$),
).subscribe(v => this.update(v));
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
// === 不要な再計算を避ける ===
const expensive$ = source$.pipe(
distinctUntilChanged(), // 値が変わった時だけ
map(computeExpensiveResult),
shareReplay(1), // 結果を共有
);
// === observeOn / subscribeOn でスケジューリング ===
import { asyncScheduler, asapScheduler, animationFrameScheduler } from 'rxjs';
import { observeOn, subscribeOn } from 'rxjs/operators';
// アニメーションフレームで描画
source$.pipe(
observeOn(animationFrameScheduler), // requestAnimationFrame に同期
).subscribe(value => {
updateUI(value); // 60fps でスムーズな描画
});13. いつ使うか
Observable が適切:
✓ WebSocket のメッセージストリーム
✓ ユーザー入力(検索、スクロール、リサイズ)
✓ リアルタイムデータ(株価、チャット)
✓ 複数のイベントソースの結合
✓ Angular の HttpClient / Forms
✓ 複雑な非同期オーケストレーション
✓ バックプレッシャー制御が必要な場面
Promise/async-await が適切:
✓ 単発のAPI呼び出し
✓ DBクエリ
✓ ファイル操作
✓ シンプルな非同期処理
✓ Node.js のサーバーサイド処理
Signals(Angular/Solid/Preact)が適切:
✓ UI の状態管理
✓ 派生値の計算
✓ 同期的なリアクティビティ
原則:
→ 単発 → Promise
→ ストリーム → Observable
→ UI状態 → Signals (利用可能なら)
→ 迷ったら Promise(シンプルさ優先)
実践演習
演習1: 基本的な実装
以下の要件を満たすコードを実装してください。
要件:
- 入力データの検証を行うこと
- エラーハンドリングを適切に実装すること
- テストコードも作成すること
# 演習1: 基本実装のテンプレート
class Exercise1:
"""基本的な実装パターンの演習"""
def __init__(self):
self.data = []
def validate_input(self, value):
"""入力値の検証"""
if value is None:
raise ValueError("入力値がNoneです")
return True
def process(self, value):
"""データ処理のメインロジック"""
self.validate_input(value)
self.data.append(value)
return self.data
def get_results(self):
"""処理結果の取得"""
return {
'count': len(self.data),
'data': self.data
}
# テスト
def test_exercise1():
ex = Exercise1()
assert ex.process(1) == [1]
assert ex.process(2) == [1, 2]
assert ex.get_results()['count'] == 2
try:
ex.process(None)
assert False, "例外が発生するべき"
except ValueError:
pass
print("全テスト合格!")
test_exercise1()演習2: 応用パターン
基本実装を拡張して、以下の機能を追加してください。
# 演習2: 応用パターン
from typing import List, Dict, Optional
from datetime import datetime
class AdvancedExercise:
"""応用パターンの演習"""
def __init__(self, max_size: int = 100):
self._items: List[Dict] = []
self._max_size = max_size
self._created_at = datetime.now()
def add(self, key: str, value: any) -> bool:
"""アイテムの追加(サイズ制限付き)"""
if len(self._items) >= self._max_size:
return False
self._items.append({
'key': key,
'value': value,
'timestamp': datetime.now().isoformat()
})
return True
def find(self, key: str) -> Optional[Dict]:
"""キーによる検索"""
for item in reversed(self._items):
if item['key'] == key:
return item
return None
def remove(self, key: str) -> bool:
"""キーによる削除"""
for i, item in enumerate(self._items):
if item['key'] == key:
self._items.pop(i)
return True
return False
def stats(self) -> Dict:
"""統計情報"""
return {
'total_items': len(self._items),
'max_size': self._max_size,
'usage_percent': len(self._items) / self._max_size * 100,
'uptime': str(datetime.now() - self._created_at)
}
# テスト
def test_advanced():
ex = AdvancedExercise(max_size=3)
assert ex.add("a", 1) == True
assert ex.add("b", 2) == True
assert ex.add("c", 3) == True
assert ex.add("d", 4) == False # サイズ制限
assert ex.find("b")['value'] == 2
assert ex.remove("b") == True
assert ex.find("b") is None
stats = ex.stats()
assert stats['total_items'] == 2
print("応用テスト全合格!")
test_advanced()演習3: パフォーマンス最適化
以下のコードのパフォーマンスを改善してください。
# 演習3: パフォーマンス最適化
import time
from functools import lru_cache
# 最適化前(O(n^2))
def slow_search(data: list, target: int) -> int:
"""非効率な検索"""
for i in range(len(data)):
for j in range(i + 1, len(data)):
if data[i] + data[j] == target:
return (i, j)
return (-1, -1)
# 最適化後(O(n))
def fast_search(data: list, target: int) -> tuple:
"""ハッシュマップを使った効率的な検索"""
seen = {}
for i, num in enumerate(data):
complement = target - num
if complement in seen:
return (seen[complement], i)
seen[num] = i
return (-1, -1)
# ベンチマーク
def benchmark():
import random
data = list(range(5000))
random.shuffle(data)
target = data[100] + data[4000]
start = time.time()
result1 = slow_search(data, target)
slow_time = time.time() - start
start = time.time()
result2 = fast_search(data, target)
fast_time = time.time() - start
print(f"非効率版: {slow_time:.4f}秒")
print(f"効率版: {fast_time:.6f}秒")
print(f"高速化率: {slow_time/fast_time:.0f}倍")
benchmark()ポイント:
- アルゴリズムの計算量を意識する
- 適切なデータ構造を選択する
- ベンチマークで効果を測定する
FAQ
Q1: このトピックを学ぶ上で最も重要なポイントは何ですか?
実践的な経験を積むことが最も重要です。理論だけでなく、実際にコードを書いて動作を確認することで理解が深まります。
Q2: 初心者がよく陥る間違いは何ですか?
基礎を飛ばして応用に進むことです。このガイドで説明している基本概念をしっかり理解してから、次のステップに進むことをお勧めします。
Q3: 実務ではどのように活用されていますか?
このトピックの知識は、日常的な開発業務で頻繁に活用されます。特にコードレビューやアーキテクチャ設計の際に重要になります。
まとめ
| 概念 | Promise | Observable |
|---|---|---|
| 値の数 | 1つ | 0〜無限 |
| 実行 | eager | lazy |
| キャンセル | 不可 | 可能 |
| オペレータ | 限定的 | 豊富 |
| 用途 | 単発のI/O | ストリーム |
| バックプレッシャー | なし | あり |
| マルチキャスト | N/A | share / Subject |
| Subject | 特徴 | 初期値 | リプレイ |
|---|---|---|---|
| Subject | 基本 | なし | なし |
| BehaviorSubject | 最新値保持 | 必要 | 1個 |
| ReplaySubject | N個リプレイ | なし | N個 |
| AsyncSubject | 最後の値 | なし | complete時 |
| オペレータ | 用途 | キャンセル | 順序保証 |
|---|---|---|---|
| switchMap | 検索 | する | 最新のみ |
| mergeMap | 並行処理 | しない | 不定 |
| concatMap | 順序処理 | しない | 保証 |
| exhaustMap | 二重送信防止 | 無視 | 最初のみ |
次に読むべきガイド
参考文献
- RxJS Documentation. rxjs.dev.
- Reactive Streams Specification. reactive-streams.org.
- Ben Lesh. "RxJS: Observable, Observer, and Subscription." rxjs.dev.
- Angular Documentation. "Observables in Angular." angular.dev.
- Kotlin Documentation. "Asynchronous Flow." kotlinlang.org.
- Erik Meijer. "Your Mouse is a Database." ACM Queue, 2012.