構造化並行性
構造化並行性は「並行処理のライフタイムを構造的に管理する」パラダイム。Kotlin coroutines、Swift structured concurrency、Python TaskGroup を通じて、安全な並行プログラミングを実現する。
88 分で読めます43,547 文字
構造化並行性
構造化並行性は「並行処理のライフタイムを構造的に管理する」パラダイム。Kotlin coroutines、Swift structured concurrency、Python TaskGroup を通じて、安全な並行プログラミングを実現する。
この章で学ぶこと
- 構造化並行性の原則を理解する
- 非構造化並行性の問題を把握する
- 各言語での実装を学ぶ
- キャンセル伝播の仕組みを理解する
- エラーハンドリングとの統合を把握する
- 実務での適用パターンを習得する
前提知識
このガイドを読む前に、以下の知識があると理解が深まります:
- 基本的なプログラミングの知識
- 関連する基礎概念の理解
- リトライ戦略 の内容を理解していること
1. 構造化並行性とは
1.1 基本概念
非構造化並行性(従来):
→ タスクを「起動して放置」
→ 親が終了しても子タスクが残る
→ エラーが子タスクで握りつぶされる
→ リソースリーク
function process() {
startBackgroundTask(); // 起動して放置
startAnotherTask(); // 誰がこの寿命を管理する?
} // process 終了後もタスクが動き続ける
構造化並行性:
→ 子タスクは親のスコープ内で完了する
→ 親は全ての子タスクの完了を待つ
→ 1つの子タスクが失敗したら、他もキャンセル
→ リソースリークなし
async function process() {
await Promise.all([ // 全ての子タスクの完了を待つ
task1(),
task2(),
]);
} // ここで全タスクが完了していることが保証
1.2 構造化プログラミングとの対比
構造化プログラミング(1968, Dijkstra):
→ goto を排除し、制御フローを構造化
→ if/else, while, for でスコープを明確に
→ コードの開始点と終了点が明確
非構造化: goto label; // どこに飛ぶかわからない
構造化: if (...) { ... } // スコープが明確
構造化並行性(2018, Elizarov, Syme):
→ 「起動して放置」を排除し、並行処理のライフタイムを構造化
→ タスクのスコープを明確に
→ タスクの開始点と終了点が明確
非構造化: Task.run(() => ...) // どこで終わるかわからない
構造化: async with TaskGroup() { ... } // スコープ内で完了保証
共通する原則:
→ 制御フローの明確化
→ スコープベースのリソース管理
→ 可読性とデバッグ容易性の向上
1.3 非構造化並行性の問題
問題1: リソースリーク
function startProcessing() {
setTimeout(() => {
// このコールバックは誰が管理する?
// startProcessing のスコープを超えて生存
processData();
}, 5000);
}
問題2: エラーの握りつぶし
function fetchAll() {
fetch('/api/users'); // エラーが起きても誰も catch しない
fetch('/api/products'); // 同上
}
問題3: キャンセルの困難
function loadDashboard() {
const p1 = fetch('/api/users');
const p2 = fetch('/api/stats');
// ユーザーがページ遷移した場合、p1とp2をキャンセルするのが困難
// 個別にAbortControllerを管理する必要がある
}
問題4: デバッグの困難
→ 非同期タスクのスタックトレースが途切れる
→ 親子関係が不明確
→ どのタスクがどの時点で動いているか追跡困難
2. Kotlin Coroutines
2.1 coroutineScope: 構造化並行性の基本
import kotlinx.coroutines.*
// coroutineScope: 構造化並行性のスコープ
suspend fun loadDashboard(): Dashboard = coroutineScope {
// 子 coroutine を起動
val userDeferred = async { fetchUser() }
val ordersDeferred = async { fetchOrders() }
val statsDeferred = async { fetchStats() }
// 全ての結果を待つ
Dashboard(
user = userDeferred.await(),
orders = ordersDeferred.await(),
stats = statsDeferred.await(),
)
// coroutineScope を抜けるとき、全子coroutineが完了していることが保証
// 1つが例外を投げたら、他もキャンセルされる
}2.2 supervisorScope: 子のエラーを独立に処理
// supervisorScope: 子のエラーが他に影響しない
suspend fun loadDashboardResilient(): Dashboard = supervisorScope {
val user = async { fetchUser() }
val orders = async {
try { fetchOrders() }
catch (e: Exception) { emptyList() } // フォールバック
}
val stats = async {
try { fetchStats() }
catch (e: Exception) { Stats.empty() }
}
Dashboard(
user = user.await(),
orders = orders.await(),
stats = stats.await(),
)
}
// coroutineScope vs supervisorScope の使い分け
//
// coroutineScope:
// → 全タスクが成功する必要がある場合
// → 1つ失敗 → 全てキャンセル
// → 例: トランザクション的な処理
//
// supervisorScope:
// → 個々のタスクが独立している場合
// → 1つ失敗しても他は続行
// → 例: ダッシュボードの各コンポーネント読み込み2.3 Kotlin のキャンセル処理
import kotlinx.coroutines.*
// キャンセルの基本
suspend fun processWithCancellation() {
val job = CoroutineScope(Dispatchers.Default).launch {
try {
repeat(1000) { i ->
println("Processing $i...")
delay(100) // キャンセルポイント
}
} catch (e: CancellationException) {
println("Cancelled!")
// クリーンアップ処理
} finally {
// リソース解放
withContext(NonCancellable) {
// キャンセル後でもこのブロック内は実行される
cleanup()
}
}
}
delay(500)
job.cancel() // キャンセル要求
job.join() // キャンセル完了を待つ
}
// キャンセル対応のベストプラクティス
suspend fun downloadFile(url: String, dest: File) = coroutineScope {
val response = httpClient.get(url)
val channel = response.bodyAsChannel()
dest.outputStream().use { output ->
val buffer = ByteArray(8192)
while (true) {
// ensureActive() で定期的にキャンセルチェック
ensureActive()
val bytesRead = channel.readAvailable(buffer)
if (bytesRead == -1) break
output.write(buffer, 0, bytesRead)
}
}
}
// タイムアウト付き処理
suspend fun fetchWithTimeout(): Result {
return withTimeout(5000) { // 5秒タイムアウト
fetchData()
}
// タイムアウト時は TimeoutCancellationException が発生
}
// タイムアウトで null を返す
suspend fun fetchWithTimeoutOrNull(): Result? {
return withTimeoutOrNull(5000) {
fetchData()
}
// タイムアウト時は null を返す(例外なし)
}2.4 Kotlin の高度なパターン
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Fan-out: 1つのプロデューサー、複数のコンシューマー
suspend fun fanOutExample() = coroutineScope {
val channel = produce {
repeat(100) { send(it) }
}
// 5つのワーカーで処理
repeat(5) { workerId ->
launch {
for (item in channel) {
println("Worker $workerId processing $item")
processItem(item)
}
}
}
}
// Fan-in: 複数のプロデューサー、1つのコンシューマー
suspend fun fanInExample() = coroutineScope {
val results = Channel<ProcessResult>()
// 複数のプロデューサー
val sources = listOf("api-1", "api-2", "api-3")
sources.forEach { source ->
launch {
val data = fetchFromSource(source)
results.send(ProcessResult(source, data))
}
}
// 全結果を収集
launch {
val allResults = mutableListOf<ProcessResult>()
repeat(sources.size) {
allResults.add(results.receive())
}
processAllResults(allResults)
results.close()
}
}
// 競争パターン: 最初の成功を返す
suspend fun raceExample(): String = coroutineScope {
select<String> {
async { fetchFromPrimary() }.onAwait { it }
async { fetchFromSecondary() }.onAwait { it }
async { fetchFromTertiary() }.onAwait { it }
}
// 最初に完了したものを返す、残りはキャンセル
}
// バックプレッシャー対応の並行処理
fun processWithBackpressure(items: List<Item>): Flow<Result> = flow {
coroutineScope {
val semaphore = Semaphore(10) // 同時実行数制限
items.map { item ->
async {
semaphore.withPermit {
processItem(item)
}
}
}.forEach { deferred ->
emit(deferred.await())
}
}
}
// エラー回復を組み込んだ構造化並行性
suspend fun resilientDashboard(): Dashboard = supervisorScope {
val user = async {
retryWithBackoff(maxRetries = 3) { fetchUser() }
}
val orders = async {
try {
withTimeout(5000) { fetchOrders() }
} catch (e: Exception) {
logger.warn("Failed to fetch orders: ${e.message}")
emptyList()
}
}
val recommendations = async {
try {
withTimeoutOrNull(3000) { fetchRecommendations() }
?: Recommendations.default()
} catch (e: Exception) {
Recommendations.default()
}
}
Dashboard(
user = user.await(),
orders = orders.await(),
recommendations = recommendations.await(),
)
}3. Swift Structured Concurrency
3.1 async let: 静的な並行性
// Swift: async let で静的な数のタスクを並行実行
func loadDashboard() async throws -> Dashboard {
async let user = fetchUser() // 並行開始
async let orders = fetchOrders() // 並行開始
async let stats = fetchStats() // 並行開始
return try await Dashboard(
user: user,
orders: orders,
stats: stats,
)
// 全ての async let の完了を待つ
// 1つが throw したら、他は自動キャンセル
}3.2 TaskGroup: 動的な並行性
// TaskGroup: 動的な数のタスク
func processItems(_ items: [Item]) async throws -> [Result] {
try await withThrowingTaskGroup(of: Result.self) { group in
for item in items {
group.addTask {
try await processItem(item)
}
}
var results: [Result] = []
for try await result in group {
results.append(result)
}
return results
}
// TaskGroup スコープ外 = 全タスク完了保証
}
// 並行数制限付きTaskGroup
func processWithConcurrencyLimit(
items: [Item],
maxConcurrent: Int = 5
) async throws -> [Result] {
try await withThrowingTaskGroup(of: Result.self) { group in
var results: [Result] = []
var iterator = items.makeIterator()
var inFlight = 0
// 初期バッチを投入
while inFlight < maxConcurrent, let item = iterator.next() {
group.addTask { try await processItem(item) }
inFlight += 1
}
// 1つ完了するたびに次を投入
for try await result in group {
results.append(result)
inFlight -= 1
if let item = iterator.next() {
group.addTask { try await processItem(item) }
inFlight += 1
}
}
return results
}
}3.3 Swift のキャンセル処理
// キャンセルの確認と対応
func downloadFile(url: URL) async throws -> Data {
var data = Data()
let (bytes, _) = try await URLSession.shared.bytes(from: url)
for try await byte in bytes {
// 定期的にキャンセルチェック
try Task.checkCancellation()
data.append(byte)
}
return data
}
// キャンセル対応のクリーンアップ
func processWithCleanup() async throws {
let tempFile = createTempFile()
do {
try await longRunningProcess(tempFile)
} catch is CancellationError {
// キャンセル時のクリーンアップ
try? FileManager.default.removeItem(at: tempFile)
throw CancellationError()
}
}
// withTaskCancellationHandler: キャンセルハンドラー
func fetchData() async throws -> Data {
let handle = startNetworkRequest()
return try await withTaskCancellationHandler {
// メイン処理
try await handle.result()
} onCancel: {
// キャンセル時にネットワークリクエストを中止
handle.cancel()
}
}
// タイムアウトの実装
func fetchWithTimeout<T>(
seconds: TimeInterval,
operation: @Sendable () async throws -> T
) async throws -> T {
try await withThrowingTaskGroup(of: T.self) { group in
group.addTask {
try await operation()
}
group.addTask {
try await Task.sleep(nanoseconds: UInt64(seconds * 1_000_000_000))
throw TimeoutError()
}
// 最初に完了したものを返す
let result = try await group.next()!
group.cancelAll() // 残りをキャンセル
return result
}
}3.4 Actor: データ競合の防止
// Actor: スレッドセーフなデータアクセス
actor UserCache {
private var cache: [String: User] = [:]
private var inFlightRequests: [String: Task<User, Error>] = [:]
func getUser(id: String) async throws -> User {
// キャッシュヒット
if let cached = cache[id] {
return cached
}
// 同じユーザーのリクエストが進行中なら待つ
if let existing = inFlightRequests[id] {
return try await existing.value
}
// 新しいリクエストを開始
let task = Task {
let user = try await fetchUser(id: id)
cache[id] = user
inFlightRequests[id] = nil
return user
}
inFlightRequests[id] = task
return try await task.value
}
func invalidate(id: String) {
cache.removeValue(forKey: id)
}
func invalidateAll() {
cache.removeAll()
}
}
// GlobalActor: 特定のコンテキストでの実行保証
@globalActor
actor DatabaseActor {
static let shared = DatabaseActor()
}
@DatabaseActor
class DatabaseManager {
private var connection: Connection?
func query(_ sql: String) async throws -> [Row] {
// DatabaseActor のコンテキストで実行される
// 自動的にスレッドセーフ
guard let conn = connection else {
throw DatabaseError.notConnected
}
return try await conn.execute(sql)
}
}
// Sendable プロトコル: 並行安全な型の保証
struct UserData: Sendable {
let id: String
let name: String
let email: String
}
// @Sendable クロージャ
func processInBackground(_ data: UserData) {
Task.detached { @Sendable in
// data は Sendable なので安全に渡せる
await processUser(data)
}
}4. Python TaskGroup(3.11+)
4.1 基本的な使い方
import asyncio
# Python 3.11+: TaskGroup
async def load_dashboard():
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user())
orders_task = tg.create_task(fetch_orders())
stats_task = tg.create_task(fetch_stats())
# async with を抜けると全タスク完了
# 1つが例外 → 他もキャンセル → ExceptionGroup が送出
return Dashboard(
user=user_task.result(),
orders=orders_task.result(),
stats=stats_task.result(),
)4.2 ExceptionGroup のハンドリング
# ExceptionGroup のハンドリング(Python 3.11+)
async def resilient_load():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task_a())
tg.create_task(task_b())
except* ValueError as eg:
print(f"ValueError group: {eg.exceptions}")
for exc in eg.exceptions:
print(f" - {exc}")
except* TypeError as eg:
print(f"TypeError group: {eg.exceptions}")
except* ConnectionError as eg:
print(f"ConnectionError group: {eg.exceptions}")
# ExceptionGroup の構造
# ExceptionGroup は複数の例外をラップする
# except* は特定の型の例外だけを選択的にキャッチ
# 残りの例外は再送出される
# 複数の except* ブロック
async def handle_multiple_errors():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task_that_may_raise_value_error())
tg.create_task(task_that_may_raise_type_error())
tg.create_task(task_that_may_raise_io_error())
except* ValueError as eg:
# ValueError だけを処理
for exc in eg.exceptions:
log_validation_error(exc)
except* (TypeError, IOError) as eg:
# TypeError と IOError をまとめて処理
for exc in eg.exceptions:
log_system_error(exc)
# 上記で処理されなかった例外型は再送出される4.3 キャンセル処理
import asyncio
from contextlib import asynccontextmanager
# タイムアウト付きTaskGroup
async def load_with_timeout():
try:
async with asyncio.timeout(5.0):
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user())
orders_task = tg.create_task(fetch_orders())
except TimeoutError:
print("Dashboard loading timed out")
return Dashboard.default()
return Dashboard(
user=user_task.result(),
orders=orders_task.result(),
)
# キャンセル対応のタスク
async def cancellable_download(url: str, dest: str) -> None:
"""キャンセル対応のダウンロード処理"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
with open(dest, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
# asyncio.CancelledError は自動的に伝播
f.write(chunk)
# shield: キャンセルから保護
async def critical_operation():
"""クリティカルな操作をキャンセルから保護"""
# shield で囲むと、外部からのキャンセルが内部に伝播しない
result = await asyncio.shield(save_to_database(data))
return result
# キャンセルハンドリングのパターン
async def process_with_cleanup():
"""キャンセル時にクリーンアップを実行"""
resource = await acquire_resource()
try:
await long_running_process(resource)
except asyncio.CancelledError:
# キャンセル時のクリーンアップ
await cleanup_resource(resource)
raise # CancelledError は必ず再送出
finally:
await release_resource(resource)4.4 高度なパターン
import asyncio
from typing import TypeVar, Callable, Awaitable, AsyncIterator
from dataclasses import dataclass
T = TypeVar('T')
R = TypeVar('R')
# 並行数制限付きバッチ処理
async def map_concurrent(
items: list[T],
func: Callable[[T], Awaitable[R]],
max_concurrent: int = 10,
) -> list[R]:
"""アイテムを並行数制限付きで処理"""
semaphore = asyncio.Semaphore(max_concurrent)
results: list[R] = [None] * len(items) # type: ignore
async def process_with_limit(index: int, item: T) -> None:
async with semaphore:
results[index] = await func(item)
async with asyncio.TaskGroup() as tg:
for i, item in enumerate(items):
tg.create_task(process_with_limit(i, item))
return results
# 使用例
async def main():
urls = [f"https://api.example.com/items/{i}" for i in range(100)]
results = await map_concurrent(
urls,
fetch_url,
max_concurrent=20,
)
# レース: 最初の成功を返す
async def race(*coros: Awaitable[T]) -> T:
"""複数のコルーチンのうち、最初に成功したものを返す"""
async with asyncio.TaskGroup() as tg:
done = asyncio.Event()
result_holder: list[T] = []
async def run_and_signal(coro: Awaitable[T]) -> None:
try:
result = await coro
if not done.is_set():
result_holder.append(result)
done.set()
except Exception:
pass # 失敗は無視
for coro in coros:
tg.create_task(run_and_signal(coro))
# 注意: TaskGroup は全タスク完了を待つ
# raceパターンにはTaskGroupは不向き
# asyncio.wait(return_when=FIRST_COMPLETED) を使う方が適切
if result_holder:
return result_holder[0]
raise RuntimeError("All tasks failed")
# asyncio.wait を使った適切なレース実装
async def race_proper(*coros: Awaitable[T]) -> T:
"""asyncio.wait で最初の完了を待つ"""
tasks = [asyncio.ensure_future(c) for c in coros]
try:
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
# 残りのタスクをキャンセル
for task in pending:
task.cancel()
# キャンセル完了を待つ
if pending:
await asyncio.wait(pending)
# 最初に完了したタスクの結果を返す
result_task = done.pop()
return result_task.result()
except Exception:
# エラー時は全タスクをキャンセル
for task in tasks:
task.cancel()
raise
# パイプライン: ステージごとに処理
async def pipeline_example():
"""マルチステージパイプライン"""
queue1: asyncio.Queue[RawData] = asyncio.Queue(maxsize=100)
queue2: asyncio.Queue[ProcessedData] = asyncio.Queue(maxsize=100)
async def stage1_fetch():
"""ステージ1: データ取得"""
for url in urls:
data = await fetch_data(url)
await queue1.put(data)
await queue1.put(None) # 終了シグナル
async def stage2_process():
"""ステージ2: データ処理"""
while True:
data = await queue1.get()
if data is None:
await queue2.put(None)
break
processed = await process_data(data)
await queue2.put(processed)
async def stage3_save():
"""ステージ3: データ保存"""
while True:
data = await queue2.get()
if data is None:
break
await save_data(data)
async with asyncio.TaskGroup() as tg:
tg.create_task(stage1_fetch())
tg.create_task(stage2_process())
tg.create_task(stage3_save())
# 構造化並行性でのリソース管理
@asynccontextmanager
async def managed_workers(
num_workers: int,
work_queue: asyncio.Queue,
handler: Callable,
):
"""ワーカープールのライフサイクル管理"""
async def worker(worker_id: int):
while True:
try:
item = await asyncio.wait_for(work_queue.get(), timeout=1.0)
await handler(worker_id, item)
work_queue.task_done()
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
tasks = []
try:
for i in range(num_workers):
task = asyncio.create_task(worker(i))
tasks.append(task)
yield tasks
finally:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
# 使用例
async def process_with_workers():
queue: asyncio.Queue = asyncio.Queue()
# キューにアイテムを追加
for item in items:
await queue.put(item)
async with managed_workers(5, queue, process_item):
await queue.join() # 全アイテムの処理完了を待つ5. JavaScript/TypeScript の構造化並行性
5.1 Promise.all — 基本的な並行処理
// Promise.all: 全タスクの完了を待つ(部分的な構造化並行性)
async function loadDashboard(): Promise<Dashboard> {
const [user, orders, stats] = await Promise.all([
fetchUser(),
fetchOrders(),
fetchStats(),
]);
return { user, orders, stats };
}
// 制限事項:
// 1. 1つ失敗すると他も即座にrejectされるが、キャンセルはされない
// 2. 残りのPromiseはバックグラウンドで実行を続ける
// 3. 明示的なキャンセル機構がない5.2 Promise.allSettled — エラー耐性
// Promise.allSettled: 全タスクの完了を待つ(成否を問わず)
async function loadDashboardResilient(): Promise<Dashboard> {
const results = await Promise.allSettled([
fetchUser(),
fetchOrders(),
fetchStats(),
]);
const user = results[0].status === 'fulfilled'
? results[0].value
: null;
const orders = results[1].status === 'fulfilled'
? results[1].value
: [];
const stats = results[2].status === 'fulfilled'
? results[2].value
: Stats.default();
if (!user) {
throw new Error('Failed to fetch user');
}
return { user, orders, stats };
}
// ヘルパー関数で使いやすく
function extractResult<T>(result: PromiseSettledResult<T>): T | null {
return result.status === 'fulfilled' ? result.value : null;
}
function extractResults<T extends readonly unknown[]>(
results: { [K in keyof T]: PromiseSettledResult<T[K]> },
): { [K in keyof T]: T[K] | null } {
return results.map(extractResult) as any;
}5.3 AbortController による擬似的な構造化並行性
// AbortController を使ったキャンセル対応の並行処理
class StructuredScope {
private controller = new AbortController();
private tasks: Promise<any>[] = [];
get signal(): AbortSignal {
return this.controller.signal;
}
addTask<T>(fn: (signal: AbortSignal) => Promise<T>): Promise<T> {
const task = fn(this.signal);
this.tasks.push(task);
return task;
}
async run<T>(
fn: (scope: StructuredScope) => Promise<T>,
): Promise<T> {
try {
const result = await fn(this);
// 残りのタスクの完了を待つ
await Promise.allSettled(this.tasks);
return result;
} catch (error) {
// エラー時は全タスクをキャンセル
this.controller.abort();
// キャンセルの完了を待つ
await Promise.allSettled(this.tasks);
throw error;
}
}
cancel(reason?: string): void {
this.controller.abort(reason);
}
}
// 使用例
async function loadWithScope(): Promise<Dashboard> {
const scope = new StructuredScope();
return scope.run(async (s) => {
const userPromise = s.addTask(async (signal) => {
const response = await fetch('/api/user', { signal });
return response.json();
});
const ordersPromise = s.addTask(async (signal) => {
const response = await fetch('/api/orders', { signal });
return response.json();
});
const [user, orders] = await Promise.all([userPromise, ordersPromise]);
return { user, orders, stats: null };
});
}
// タイムアウト付きスコープ
async function loadWithTimeout(): Promise<Dashboard> {
const scope = new StructuredScope();
// タイムアウトでキャンセル
const timeout = setTimeout(() => scope.cancel('timeout'), 5000);
try {
return await scope.run(async (s) => {
const user = await s.addTask((signal) =>
fetchWithSignal('/api/user', signal)
);
const orders = await s.addTask((signal) =>
fetchWithSignal('/api/orders', signal)
);
return { user, orders, stats: null };
});
} finally {
clearTimeout(timeout);
}
}5.4 並行数制限付き処理
// セマフォベースの並行数制限
class AsyncSemaphore {
private current = 0;
private queue: Array<() => void> = [];
constructor(private readonly limit: number) {}
async acquire(): Promise<void> {
if (this.current < this.limit) {
this.current++;
return;
}
return new Promise<void>((resolve) => {
this.queue.push(resolve);
});
}
release(): void {
if (this.queue.length > 0) {
const next = this.queue.shift()!;
next();
} else {
this.current--;
}
}
async withPermit<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
// 並行数制限付き map
async function mapConcurrent<T, R>(
items: T[],
fn: (item: T) => Promise<R>,
concurrency: number = 10,
): Promise<R[]> {
const semaphore = new AsyncSemaphore(concurrency);
return Promise.all(
items.map((item) =>
semaphore.withPermit(() => fn(item))
),
);
}
// 使用例
const results = await mapConcurrent(
urls,
async (url) => {
const response = await fetch(url);
return response.json();
},
5, // 最大5並行
);6. Rust の構造化並行性
6.1 tokio::select! マクロ
use tokio::time::{sleep, Duration};
// select! で最初の完了を待つ
async fn fetch_with_timeout() -> Result<Data, Error> {
tokio::select! {
result = fetch_data() => result,
_ = sleep(Duration::from_secs(5)) => {
Err(Error::Timeout)
}
}
// 最初に完了した方の結果を返す
// もう一方はキャンセルされる(Future がドロップ)
}
// 複数のソースからの受信
async fn handle_messages(
mut ws_rx: WebSocketReceiver,
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
) {
loop {
tokio::select! {
msg = ws_rx.next() => {
match msg {
Some(Ok(message)) => handle_message(message).await,
Some(Err(e)) => {
eprintln!("WebSocket error: {}", e);
break;
}
None => break,
}
}
_ = shutdown_rx.changed() => {
println!("Shutdown signal received");
break;
}
}
}
}6.2 tokio::spawn と JoinSet
use tokio::task::JoinSet;
// JoinSet: 構造化された並行タスクの管理
async fn process_items(items: Vec<Item>) -> Vec<Result<ProcessResult, Error>> {
let mut set = JoinSet::new();
for item in items {
set.spawn(async move {
process_item(item).await
});
}
let mut results = Vec::new();
while let Some(result) = set.join_next().await {
match result {
Ok(process_result) => results.push(process_result),
Err(join_error) => {
eprintln!("Task panicked: {}", join_error);
}
}
}
results
}
// JoinSet + 並行数制限
async fn process_with_limit(
items: Vec<Item>,
max_concurrent: usize,
) -> Vec<ProcessResult> {
let mut set = JoinSet::new();
let mut results = Vec::new();
let mut iter = items.into_iter();
// 初期バッチを投入
for _ in 0..max_concurrent {
if let Some(item) = iter.next() {
set.spawn(async move { process_item(item).await });
}
}
// 1つ完了するたびに次を投入
while let Some(result) = set.join_next().await {
if let Ok(Ok(r)) = result {
results.push(r);
}
if let Some(item) = iter.next() {
set.spawn(async move { process_item(item).await });
}
}
results
}
// スコープ付きタスク(Rust 特有)
// tokio::task::LocalSet を使ったローカルタスク管理
async fn scoped_tasks() {
let local = tokio::task::LocalSet::new();
local.run_until(async {
let handle1 = tokio::task::spawn_local(async {
// ローカルタスク(Send不要)
process_local_data().await
});
let handle2 = tokio::task::spawn_local(async {
process_another_local_data().await
});
let (r1, r2) = tokio::join!(handle1, handle2);
println!("Results: {:?}, {:?}", r1, r2);
}).await;
// LocalSet のスコープを抜けると全ローカルタスクが完了
}7. 構造化並行性の原則
7.1 3つの核心原則
3つの原則:
1. 子タスクは親のスコープ内で生存
→ 親が終了 = 子も終了(リーク防止)
→ タスクのライフタイムがスコープと一致
→ デバッグ時にタスクの親子関係が明確
2. エラーの伝播
→ 子のエラーは親に伝播する
→ 握りつぶされない
→ ExceptionGroup(Python)で複数エラーを扱える
3. キャンセルの伝播
→ 親がキャンセルされたら子もキャンセル
→ 1つの子が失敗したら兄弟もキャンセル(coroutineScope)
→ キャンセルは協調的(cooperative)
メリット:
✓ リソースリーク防止
✓ エラーの確実な処理
✓ コードの可読性(スコープが明確)
✓ デバッグの容易さ
✓ テスタビリティの向上
✓ 推論の容易さ(関数の終了 = 全子タスクの終了)
7.2 キャンセルの協調性
キャンセルは「要求」であり「強制」ではない:
協調的キャンセル:
→ キャンセル要求を受け取ったタスクが自発的に停止
→ タスクは安全な停止ポイントでキャンセルをチェック
→ クリーンアップの機会が与えられる
各言語のキャンセルポイント:
Kotlin: delay(), yield(), ensureActive(), suspend関数
Swift: Task.checkCancellation(), await
Python: await(asyncio.CancelledError が送出)
Rust: Future の poll が Pending を返す時
キャンセル時のベストプラクティス:
1. CancelledError/CancellationException は再送出する
2. finally ブロックでリソースを解放する
3. クリティカルセクションはキャンセルから保護する
→ Kotlin: withContext(NonCancellable)
→ Python: asyncio.shield()
4. 定期的にキャンセルチェックを行う
7.3 設計パターンの比較
パターン1: All or Nothing(全部成功 or 全部失敗)
→ Kotlin: coroutineScope
→ Swift: withThrowingTaskGroup
→ Python: asyncio.TaskGroup
→ 用途: トランザクション的な処理、全データが必要な場合
パターン2: Best Effort(できるだけ成功)
→ Kotlin: supervisorScope
→ Swift: withTaskGroup(エラーを個別ハンドリング)
→ Python: TaskGroup + except*
→ JS/TS: Promise.allSettled
→ 用途: ダッシュボード、部分的な結果でOKの場合
パターン3: First Success(最初の成功を採用)
→ Kotlin: select
→ Swift: TaskGroup + cancelAll
→ Python: asyncio.wait(FIRST_COMPLETED)
→ JS/TS: Promise.race
→ 用途: ヘッジリクエスト、マルチソース取得
パターン4: Fan-Out/Fan-In
→ 複数のプロデューサーとコンシューマー
→ チャネルやキューを組み合わせ
→ 用途: パイプライン処理、並列データ処理
8. 実務での適用パターン
8.1 マイクロサービスの並行呼び出し
// BFF(Backend for Frontend)パターンでの並行API呼び出し
class DashboardBFF {
async getDashboard(userId: string): Promise<DashboardResponse> {
const [
userResult,
ordersResult,
notificationsResult,
recommendationsResult,
] = await Promise.allSettled([
// 必須: ユーザー情報
this.userService.getUser(userId),
// 必須: 注文履歴
this.orderService.getOrders(userId),
// オプション: 通知(失敗しても可)
this.notificationService.getUnread(userId),
// オプション: レコメンド(失敗しても可)
this.recommendationService.getForUser(userId),
]);
// 必須データのチェック
if (userResult.status === 'rejected') {
throw new ServiceError('Failed to fetch user data', userResult.reason);
}
if (ordersResult.status === 'rejected') {
throw new ServiceError('Failed to fetch orders', ordersResult.reason);
}
return {
user: userResult.value,
orders: ordersResult.value,
notifications: notificationsResult.status === 'fulfilled'
? notificationsResult.value
: [],
recommendations: recommendationsResult.status === 'fulfilled'
? recommendationsResult.value
: [],
};
}
}8.2 バッチ処理
import asyncio
from typing import TypeVar, Callable, Awaitable
T = TypeVar('T')
R = TypeVar('R')
async def batch_process(
items: list[T],
processor: Callable[[T], Awaitable[R]],
batch_size: int = 50,
max_concurrent: int = 10,
on_progress: Callable[[int, int], None] | None = None,
) -> tuple[list[R], list[tuple[T, Exception]]]:
"""構造化並行性を使ったバッチ処理"""
results: list[R] = []
errors: list[tuple[T, Exception]] = []
completed = 0
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
semaphore = asyncio.Semaphore(max_concurrent)
async def process_item(item: T) -> tuple[T, R | None, Exception | None]:
async with semaphore:
try:
result = await processor(item)
return (item, result, None)
except Exception as e:
return (item, None, e)
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(process_item(item))
for item in batch
]
for task in tasks:
item, result, error = task.result()
if error:
errors.append((item, error))
else:
results.append(result)
completed += len(batch)
if on_progress:
on_progress(completed, len(items))
return results, errors
# 使用例
async def main():
users = await fetch_all_users()
results, errors = await batch_process(
users,
send_notification,
batch_size=100,
max_concurrent=20,
on_progress=lambda done, total: print(f"{done}/{total}"),
)
print(f"Sent: {len(results)}, Failed: {len(errors)}")
for user, error in errors:
print(f" Failed for {user.id}: {error}")8.3 ヘルスチェック
// 複数の依存サービスのヘルスチェック
data class HealthStatus(
val service: String,
val healthy: Boolean,
val latencyMs: Long,
val error: String? = null,
)
suspend fun checkAllHealth(): List<HealthStatus> = supervisorScope {
val services = mapOf(
"database" to { checkDatabase() },
"redis" to { checkRedis() },
"elasticsearch" to { checkElasticsearch() },
"external-api" to { checkExternalApi() },
)
services.map { (name, check) ->
async {
val start = System.currentTimeMillis()
try {
withTimeout(5000) { check() }
HealthStatus(
service = name,
healthy = true,
latencyMs = System.currentTimeMillis() - start,
)
} catch (e: Exception) {
HealthStatus(
service = name,
healthy = false,
latencyMs = System.currentTimeMillis() - start,
error = e.message,
)
}
}
}.awaitAll()
}9. テスト戦略
9.1 構造化並行性のテスト
// Kotlin: テスト用のディスパッチャー
@Test
fun `dashboard loads all data concurrently`() = runTest {
val userService = FakeUserService()
val orderService = FakeOrderService()
val dashboard = loadDashboard(userService, orderService)
assertEquals("田中太郎", dashboard.user.name)
assertEquals(3, dashboard.orders.size)
}
@Test
fun `partial failure returns fallback data`() = runTest {
val userService = FakeUserService()
val orderService = FailingOrderService()
val dashboard = loadDashboardResilient(userService, orderService)
assertEquals("田中太郎", dashboard.user.name)
assertEquals(emptyList(), dashboard.orders) // フォールバック
}
@Test
fun `cancellation propagates to child tasks`() = runTest {
val job = launch {
loadDashboard(
SlowUserService(delay = 10.seconds),
SlowOrderService(delay = 10.seconds),
)
}
advanceTimeBy(1.seconds)
job.cancel()
assertTrue(job.isCancelled)
// 子タスクもキャンセルされていることを確認
}# Python: 構造化並行性のテスト
import pytest
import asyncio
@pytest.mark.asyncio
async def test_task_group_all_succeed():
"""全タスクが成功する場合"""
results = []
async with asyncio.TaskGroup() as tg:
async def task(value):
await asyncio.sleep(0.01)
results.append(value)
tg.create_task(task(1))
tg.create_task(task(2))
tg.create_task(task(3))
assert sorted(results) == [1, 2, 3]
@pytest.mark.asyncio
async def test_task_group_one_fails():
"""1つのタスクが失敗すると他もキャンセルされる"""
with pytest.raises(ExceptionGroup) as exc_info:
async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.sleep(10)) # これはキャンセルされる
tg.create_task(failing_task()) # これが失敗
assert len(exc_info.value.exceptions) == 1
assert isinstance(exc_info.value.exceptions[0], ValueError)
@pytest.mark.asyncio
async def test_cancellation_propagation():
"""キャンセルが子タスクに伝播する"""
cancelled = asyncio.Event()
async def cancellable_task():
try:
await asyncio.sleep(100)
except asyncio.CancelledError:
cancelled.set()
raise
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(0.01)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert cancelled.is_set()
@pytest.mark.asyncio
async def test_timeout_with_task_group():
"""タイムアウトでTaskGroup全体がキャンセルされる"""
with pytest.raises(TimeoutError):
async with asyncio.timeout(0.1):
async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.sleep(10))
tg.create_task(asyncio.sleep(10))10. アンチパターン
10.1 避けるべきパターン
アンチパターン1: Fire and Forget(起動して放置)
✗ Bad:
function handleRequest() {
sendEmail(user.email); // 結果を待たない、エラーも検知しない
return { ok: true };
}
✓ Good:
function handleRequest() {
// ジョブキューに投入(信頼性のある非同期処理)
await jobQueue.enqueue('send-email', { email: user.email });
return { ok: true };
}
アンチパターン2: 無限のキャンセル無視
✗ Bad:
async def process():
while True:
data = compute_heavy() # キャンセルポイントがない
results.append(data)
✓ Good:
async def process():
while True:
await asyncio.sleep(0) # キャンセルチェック
data = compute_heavy()
results.append(data)
アンチパターン3: CancelledError の握りつぶし
✗ Bad:
async def task():
try:
await operation()
except Exception: # CancelledError も catch してしまう
pass
✓ Good:
async def task():
try:
await operation()
except asyncio.CancelledError:
raise # 必ず再送出
except Exception:
pass
アンチパターン4: 不要なグローバルスコープ
✗ Bad (Kotlin):
fun handleRequest() {
GlobalScope.launch { ... } // ライフサイクル管理なし
}
✓ Good (Kotlin):
suspend fun handleRequest() = coroutineScope {
launch { ... } // スコープ内で管理
}
アンチパターン5: 過度な並行性
✗ Bad:
// 10万件を全て同時に処理
await Promise.all(items.map(item => process(item)));
✓ Good:
// 並行数を制限
await mapConcurrent(items, process, 20);
実践演習
演習1: 基本的な実装
以下の要件を満たすコードを実装してください。
要件:
- 入力データの検証を行うこと
- エラーハンドリングを適切に実装すること
- テストコードも作成すること
# 演習1: 基本実装のテンプレート
class Exercise1:
"""基本的な実装パターンの演習"""
def __init__(self):
self.data = []
def validate_input(self, value):
"""入力値の検証"""
if value is None:
raise ValueError("入力値がNoneです")
return True
def process(self, value):
"""データ処理のメインロジック"""
self.validate_input(value)
self.data.append(value)
return self.data
def get_results(self):
"""処理結果の取得"""
return {
'count': len(self.data),
'data': self.data
}
# テスト
def test_exercise1():
ex = Exercise1()
assert ex.process(1) == [1]
assert ex.process(2) == [1, 2]
assert ex.get_results()['count'] == 2
try:
ex.process(None)
assert False, "例外が発生するべき"
except ValueError:
pass
print("全テスト合格!")
test_exercise1()演習2: 応用パターン
基本実装を拡張して、以下の機能を追加してください。
# 演習2: 応用パターン
from typing import List, Dict, Optional
from datetime import datetime
class AdvancedExercise:
"""応用パターンの演習"""
def __init__(self, max_size: int = 100):
self._items: List[Dict] = []
self._max_size = max_size
self._created_at = datetime.now()
def add(self, key: str, value: any) -> bool:
"""アイテムの追加(サイズ制限付き)"""
if len(self._items) >= self._max_size:
return False
self._items.append({
'key': key,
'value': value,
'timestamp': datetime.now().isoformat()
})
return True
def find(self, key: str) -> Optional[Dict]:
"""キーによる検索"""
for item in reversed(self._items):
if item['key'] == key:
return item
return None
def remove(self, key: str) -> bool:
"""キーによる削除"""
for i, item in enumerate(self._items):
if item['key'] == key:
self._items.pop(i)
return True
return False
def stats(self) -> Dict:
"""統計情報"""
return {
'total_items': len(self._items),
'max_size': self._max_size,
'usage_percent': len(self._items) / self._max_size * 100,
'uptime': str(datetime.now() - self._created_at)
}
# テスト
def test_advanced():
ex = AdvancedExercise(max_size=3)
assert ex.add("a", 1) == True
assert ex.add("b", 2) == True
assert ex.add("c", 3) == True
assert ex.add("d", 4) == False # サイズ制限
assert ex.find("b")['value'] == 2
assert ex.remove("b") == True
assert ex.find("b") is None
stats = ex.stats()
assert stats['total_items'] == 2
print("応用テスト全合格!")
test_advanced()演習3: パフォーマンス最適化
以下のコードのパフォーマンスを改善してください。
# 演習3: パフォーマンス最適化
import time
from functools import lru_cache
# 最適化前(O(n^2))
def slow_search(data: list, target: int) -> int:
"""非効率な検索"""
for i in range(len(data)):
for j in range(i + 1, len(data)):
if data[i] + data[j] == target:
return (i, j)
return (-1, -1)
# 最適化後(O(n))
def fast_search(data: list, target: int) -> tuple:
"""ハッシュマップを使った効率的な検索"""
seen = {}
for i, num in enumerate(data):
complement = target - num
if complement in seen:
return (seen[complement], i)
seen[num] = i
return (-1, -1)
# ベンチマーク
def benchmark():
import random
data = list(range(5000))
random.shuffle(data)
target = data[100] + data[4000]
start = time.time()
result1 = slow_search(data, target)
slow_time = time.time() - start
start = time.time()
result2 = fast_search(data, target)
fast_time = time.time() - start
print(f"非効率版: {slow_time:.4f}秒")
print(f"効率版: {fast_time:.6f}秒")
print(f"高速化率: {slow_time/fast_time:.0f}倍")
benchmark()ポイント:
- アルゴリズムの計算量を意識する
- 適切なデータ構造を選択する
- ベンチマークで効果を測定する
FAQ
Q1: このトピックを学ぶ上で最も重要なポイントは何ですか?
実践的な経験を積むことが最も重要です。理論だけでなく、実際にコードを書いて動作を確認することで理解が深まります。
Q2: 初心者がよく陥る間違いは何ですか?
基礎を飛ばして応用に進むことです。このガイドで説明している基本概念をしっかり理解してから、次のステップに進むことをお勧めします。
Q3: 実務ではどのように活用されていますか?
このトピックの知識は、日常的な開発業務で頻繁に活用されます。特にコードレビューやアーキテクチャ設計の際に重要になります。
まとめ
| 言語 | 構造化並行性 | スコープ | キャンセル | エラー伝播 |
|---|---|---|---|---|
| Kotlin | coroutineScope | 全子完了を待つ | CancellationException | 自動伝播 |
| Kotlin | supervisorScope | 全子完了を待つ | 独立 | 独立ハンドリング |
| Swift | async let | 全子完了を待つ | 自動キャンセル | throws で伝播 |
| Swift | TaskGroup | 全子完了を待つ | cancelAll() | throws で伝播 |
| Python | asyncio.TaskGroup | 全子完了を待つ | CancelledError | ExceptionGroup |
| Rust | tokio JoinSet | 明示的に待つ | Future ドロップ | JoinError |
| JS/TS | Promise.all | 明示的に待つ | AbortController | reject 伝播 |
次に読むべきガイド
参考文献
- Elizarov, R. "Structured Concurrency." vorpus.org, 2018.
- Swift Evolution. "SE-0304: Structured Concurrency."
- Python Documentation. "asyncio — TaskGroup." docs.python.org.
- Kotlin Documentation. "Coroutines guide." kotlinlang.org.
- Smith, N. "Notes on structured concurrency, or: Go statement considered harmful." 2018.
- Tokio Documentation. "Working with Tasks." tokio.rs.
- Apple Developer. "Concurrency — Swift Programming Language." developer.apple.com.
- Syme, D. "The early history of F# async." fsharpforfunandprofit.com.
- Sustrik, M. "Structured Concurrency." 250bpm.com, 2016.
- Nygard, M. "Release It!" Pragmatic Bookshelf, 2018.