API 統合 — SDK・ストリーミング・リトライ戦略
LLM API 統合はモデルの能力をアプリケーションに組み込むエンジニアリングであり、SDK 選定、ストリーミング実装、エラーハンドリング、レート制限対策、コスト管理を体系的に設計する必要がある。
API 統合 — SDK・ストリーミング・リトライ戦略
LLM API 統合はモデルの能力をアプリケーションに組み込むエンジニアリングであり、SDK 選定、ストリーミング実装、エラーハンドリング、レート制限対策、コスト管理を体系的に設計する必要がある。
この章で学ぶこと
- 主要プロバイダの SDK と共通抽象レイヤー — OpenAI、Anthropic、Google、LiteLLM による統一アクセス
- ストリーミングの実装パターン — SSE、WebSocket、バックプレッシャー制御
- プロダクション品質のエラーハンドリング — リトライ、フォールバック、サーキットブレーカー
- レート制限とコスト管理 — トークンバケット、予算管理、使用量監視
- プロンプトキャッシュとバッチ API — コスト削減のための高度な API 活用
- セキュリティとオブザーバビリティ — API キー管理、ログ、メトリクス
前提知識
このガイドを読む前に、以下の知識があると理解が深まります:
- 基本的なプログラミングの知識
- 関連する基礎概念の理解
1. SDK 概要
| LLM API 統合のレイヤー構造 | ||||
|---|---|---|---|---|
| アプリケーション | ||||
| ▼ | ||||
| ┌──────────────────────────────────────┐ | ||||
| 抽象レイヤー (LiteLLM / OpenRouter) | ← 推奨 | |||
| - マルチプロバイダ対応 | ||||
| - 統一 API インターフェース | ||||
| - 自動フォールバック | ||||
| └───────────┬──────────────────────────┘ | ||||
| ┌─────────┼────────────┬────────────┐ | ||||
| ▼ ▼ ▼ ▼ | ||||
| OpenAI Anthropic Google AI Ollama | ||||
| SDK SDK SDK (Local) | ||||
| ▼ ▼ ▼ ▼ | ||||
| GPT-4o Claude 3.5 Gemini 1.5 Llama 3.1 | ||||
| o1/o3 Haiku Flash/Pro Qwen 2.5 |
1.1 OpenAI SDK
from openai import OpenAI, AsyncOpenAI
# 同期クライアント
client = OpenAI(
api_key="sk-...", # 省略時は OPENAI_API_KEY 環境変数を使用
timeout=30.0, # タイムアウト
max_retries=3, # 自動リトライ回数
base_url="https://api.openai.com/v1", # カスタムエンドポイント対応
)
# 基本的なチャット補完
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "あなたは有能なアシスタントです。"},
{"role": "user", "content": "Pythonのデコレータを解説してください"},
],
temperature=0.7,
max_tokens=1024,
top_p=1.0,
frequency_penalty=0.0,
presence_penalty=0.0,
)
print(response.choices[0].message.content)
print(f"トークン使用量: {response.usage.total_tokens}")
print(f"入力: {response.usage.prompt_tokens}, 出力: {response.usage.completion_tokens}")
# 非同期クライアント
async_client = AsyncOpenAI()
response = await async_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
)
# Structured Output (JSON モード)
from pydantic import BaseModel
class ExtractedData(BaseModel):
name: str
age: int
occupation: str
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[
{"role": "system", "content": "テキストから情報を抽出してJSON形式で返してください"},
{"role": "user", "content": "田中太郎さんは35歳のエンジニアです"},
],
response_format=ExtractedData,
)
data = response.choices[0].message.parsed
print(f"名前: {data.name}, 年齢: {data.age}, 職業: {data.occupation}")
# バッチ API
batch_input = client.files.create(
file=open("batch_requests.jsonl", "rb"),
purpose="batch",
)
batch_job = client.batches.create(
input_file_id=batch_input.id,
endpoint="/v1/chat/completions",
completion_window="24h",
)
print(f"バッチジョブID: {batch_job.id}, 状態: {batch_job.status}")1.2 Anthropic SDK
from anthropic import Anthropic, AsyncAnthropic
client = Anthropic()
# 基本的なメッセージ作成
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system="あなたは有能なアシスタントです。",
messages=[{"role": "user", "content": "Hello"}],
)
print(response.content[0].text)
print(f"入力: {response.usage.input_tokens}, 出力: {response.usage.output_tokens}")
print(f"停止理由: {response.stop_reason}")
# マルチモーダル入力(画像)
import base64
with open("image.png", "rb") as f:
image_data = base64.standard_b64encode(f.read()).decode("utf-8")
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": image_data,
},
},
{
"type": "text",
"text": "この画像の内容を説明してください",
},
],
}],
)
# プロンプトキャッシュ
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=[
{
"type": "text",
"text": "あなたは法律の専門家です。以下の法律文書を参照して回答してください...(長い文書)",
"cache_control": {"type": "ephemeral"}, # キャッシュ有効化
},
],
messages=[{"role": "user", "content": "第3条について要約してください"}],
)
# cache_creation_input_tokens と cache_read_input_tokens で
# キャッシュの利用状況を確認できる
print(f"キャッシュ作成: {response.usage.cache_creation_input_tokens}")
print(f"キャッシュ読み: {response.usage.cache_read_input_tokens}")
# Extended Thinking
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=16384,
thinking={
"type": "enabled",
"budget_tokens": 10000, # 推論に使えるトークン数の上限
},
messages=[{"role": "user", "content": "この数学問題を解いてください: ..."}],
)
# thinking ブロックと text ブロックが返される
for block in response.content:
if block.type == "thinking":
print(f"[思考過程] {block.thinking[:200]}...")
elif block.type == "text":
print(f"[回答] {block.text}")
# バッチ API
batch = client.messages.batches.create(
requests=[
{
"custom_id": f"req-{i}",
"params": {
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1024,
"messages": [{"role": "user", "content": f"質問 {i}"}],
},
}
for i in range(100)
],
)
print(f"バッチID: {batch.id}")1.3 Google Generative AI SDK
import google.generativeai as genai
genai.configure(api_key="AIza...")
# Gemini モデルの利用
model = genai.GenerativeModel("gemini-1.5-pro")
response = model.generate_content("日本の歴史を要約してください")
print(response.text)
# マルチモーダル(画像入力)
import PIL.Image
img = PIL.Image.open("chart.png")
response = model.generate_content(["このグラフを分析してください", img])
# マルチモーダル(動画入力)※ Gemini 独自機能
video_file = genai.upload_file("presentation.mp4")
response = model.generate_content(
["この動画の要点をまとめてください", video_file],
request_options={"timeout": 600},
)
# 長文コンテキスト(200万トークン対応)
long_document = open("large_document.txt").read()
response = model.generate_content(
f"以下の文書を分析して要点を抽出してください:\n\n{long_document}",
generation_config=genai.types.GenerationConfig(
temperature=0.3,
max_output_tokens=8192,
),
)
# ストリーミング
response = model.generate_content("詳しく説明してください", stream=True)
for chunk in response:
print(chunk.text, end="", flush=True)
# Safety 設定
from google.generativeai.types import HarmCategory, HarmBlockThreshold
response = model.generate_content(
"...",
safety_settings={
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_LOW_AND_ABOVE,
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_LOW_AND_ABOVE,
},
)1.4 LiteLLM (マルチプロバイダ統一)
from litellm import completion, acompletion
import litellm
# 同じインターフェースで異なるプロバイダを呼び出し
response = completion(
model="gpt-4o", # OpenAI
messages=[{"role": "user", "content": "Hello"}],
)
response = completion(
model="claude-3-5-sonnet-20241022", # Anthropic
messages=[{"role": "user", "content": "Hello"}],
)
response = completion(
model="gemini/gemini-1.5-pro", # Google
messages=[{"role": "user", "content": "Hello"}],
)
response = completion(
model="ollama/llama3.1", # ローカル Ollama
messages=[{"role": "user", "content": "Hello"}],
api_base="http://localhost:11434",
)
# LiteLLM Router: 負荷分散 + フォールバック
from litellm import Router
router = Router(
model_list=[
{
"model_name": "primary",
"litellm_params": {
"model": "gpt-4o",
"api_key": "sk-...",
},
},
{
"model_name": "primary", # 同じ名前で複数モデルを設定
"litellm_params": {
"model": "claude-3-5-sonnet-20241022",
"api_key": "sk-ant-...",
},
},
],
routing_strategy="least-busy", # latency-based-routing, simple-shuffle 等
num_retries=3,
fallbacks=[
{"primary": ["gpt-4o-mini"]}, # primary が全て失敗したら mini にフォールバック
],
)
response = await router.acompletion(
model="primary",
messages=[{"role": "user", "content": "Hello"}],
)
# コスト追跡
litellm.success_callback = ["langfuse"] # Langfuse でコスト・品質を追跡
litellm.set_verbose = True
# カスタムコールバック
def log_callback(kwargs, completion_response, start_time, end_time):
print(f"モデル: {kwargs['model']}")
print(f"レイテンシ: {end_time - start_time}")
print(f"トークン: {completion_response.usage}")
litellm.success_callback = [log_callback]2. ストリーミング実装
2.1 基本ストリーミング
from openai import OpenAI
client = OpenAI()
# ストリーミング (同期)
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "日本の歴史を要約してください"}],
stream=True,
stream_options={"include_usage": True}, # 使用量情報を含める
)
full_response = ""
usage = None
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
print(token, end="", flush=True)
full_response += token
if chunk.usage:
usage = chunk.usage
print(f"\n\n入力: {usage.prompt_tokens}, 出力: {usage.completion_tokens}")2.2 Anthropic ストリーミング
from anthropic import Anthropic
client = Anthropic()
# ストリーミング(イベントベース)
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": "Pythonの非同期処理を解説してください"}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# イベント詳細版
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}],
) as stream:
for event in stream:
if event.type == "message_start":
print(f"[開始] モデル: {event.message.model}")
elif event.type == "content_block_start":
print(f"[ブロック開始] タイプ: {event.content_block.type}")
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_delta":
print(f"\n[完了] 停止理由: {event.delta.stop_reason}")
print(f"出力トークン: {event.usage.output_tokens}")2.3 FastAPI + Server-Sent Events (SSE)
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from openai import AsyncOpenAI
import json
import asyncio
import time
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
client = AsyncOpenAI()
class ChatRequest(BaseModel):
message: str
model: str = "gpt-4o"
max_tokens: int = 1024
temperature: float = 0.7
stream: bool = True
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""SSE でストリーミングレスポンスを返す"""
async def generate():
try:
start_time = time.time()
ttft = None
stream = await client.chat.completions.create(
model=request.model,
messages=[{"role": "user", "content": request.message}],
max_tokens=request.max_tokens,
temperature=request.temperature,
stream=True,
stream_options={"include_usage": True},
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
if ttft is None:
ttft = time.time() - start_time
data = json.dumps({
"token": token,
"done": False,
"ttft": ttft,
})
yield f"data: {data}\n\n"
if chunk.usage:
usage_data = json.dumps({
"token": "",
"done": True,
"usage": {
"input_tokens": chunk.usage.prompt_tokens,
"output_tokens": chunk.usage.completion_tokens,
},
"latency": time.time() - start_time,
"ttft": ttft,
})
yield f"data: {usage_data}\n\n"
except Exception as e:
error_data = json.dumps({
"error": str(e),
"done": True,
})
yield f"data: {error_data}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)
# 非ストリーミング版(比較用)
@app.post("/chat")
async def chat(request: ChatRequest):
"""通常のJSON レスポンスを返す"""
response = await client.chat.completions.create(
model=request.model,
messages=[{"role": "user", "content": request.message}],
max_tokens=request.max_tokens,
temperature=request.temperature,
)
return {
"content": response.choices[0].message.content,
"usage": {
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens,
},
}2.4 フロントエンド SSE クライアント
// TypeScript: SSE クライアント
class LLMStreamClient {
private baseUrl: string;
constructor(baseUrl: string) {
this.baseUrl = baseUrl;
}
async streamChat(
message: string,
onToken: (token: string) => void,
onComplete: (usage: any) => void,
onError: (error: string) => void,
): Promise<void> {
const response = await fetch(`${this.baseUrl}/chat/stream`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, stream: true }),
});
if (!response.ok) {
onError(`HTTP error: ${response.status}`);
return;
}
const reader = response.body?.getReader();
if (!reader) {
onError('No reader available');
return;
}
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// SSE イベントをパース
const lines = buffer.split('\n\n');
buffer = lines.pop() || ''; // 最後の不完全な部分を保持
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.error) {
onError(data.error);
return;
}
if (data.done) {
onComplete(data.usage);
} else {
onToken(data.token);
}
}
}
}
}
}
// React Hook での使用例
function useLLMStream() {
const [response, setResponse] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const client = new LLMStreamClient('/api');
const sendMessage = async (message: string) => {
setResponse('');
setIsStreaming(true);
await client.streamChat(
message,
(token) => setResponse((prev) => prev + token),
(usage) => {
setIsStreaming(false);
console.log('使用量:', usage);
},
(error) => {
setIsStreaming(false);
console.error('エラー:', error);
},
);
};
return { response, isStreaming, sendMessage };
}2.5 非同期ストリーミングとバックプレッシャー
import asyncio
from openai import AsyncOpenAI
from collections import deque
class StreamBuffer:
"""バックプレッシャー対応ストリームバッファ"""
def __init__(self, max_size: int = 100):
self.buffer: deque = deque(maxlen=max_size)
self.event = asyncio.Event()
self.done = False
async def put(self, item: str):
"""バッファにアイテムを追加"""
while len(self.buffer) >= self.buffer.maxlen:
# バッファが満杯: 消費されるまで待機
await asyncio.sleep(0.01)
self.buffer.append(item)
self.event.set()
async def get(self) -> str | None:
"""バッファからアイテムを取得"""
while not self.buffer and not self.done:
self.event.clear()
await self.event.wait()
if self.buffer:
return self.buffer.popleft()
return None
def mark_done(self):
self.done = True
self.event.set()
async def producer(buffer: StreamBuffer, prompt: str):
"""LLM からのストリームをバッファに書き込む"""
client = AsyncOpenAI()
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content:
await buffer.put(chunk.choices[0].delta.content)
buffer.mark_done()
async def consumer(buffer: StreamBuffer):
"""バッファからトークンを消費して表示"""
while True:
token = await buffer.get()
if token is None:
break
# ここで表示や加工処理を行う
print(token, end="", flush=True)
# 消費者が遅い場合のシミュレーション
await asyncio.sleep(0.01)
async def main():
buffer = StreamBuffer(max_size=50)
await asyncio.gather(
producer(buffer, "Pythonの非同期処理を解説してください"),
consumer(buffer),
)3. エラーハンドリングとリトライ
| エラーハンドリング戦略 |
|---|
| HTTP Status 原因 対策 |
| ────────── ───── ───── |
| 400 不正リクエスト 入力検証・修正 |
| 401 認証エラー APIキー確認 |
| 403 権限不足 プラン確認 |
| 404 モデル不存在 モデル名確認 |
| 413 入力が大きすぎる トークン数削減 |
| 429 レート制限 指数バックオフリトライ |
| 500 サーバーエラー リトライ + フォールバック |
| 503 過負荷 待機 + リトライ |
| 529 過負荷(Anthropic) 待機 + リトライ |
| タイムアウト 応答遅延 タイムアウト延長/リトライ |
| リトライ対象: 429, 500, 503, 529, タイムアウト |
| リトライ不可: 400, 401, 403, 404, 413 |
3.1 指数バックオフリトライ(本格版)
import time
import random
import logging
from dataclasses import dataclass, field
from typing import Optional, Callable
from openai import OpenAI, RateLimitError, APIError, APITimeoutError, APIConnectionError
logger = logging.getLogger(__name__)
@dataclass
class RetryConfig:
"""リトライ設定"""
max_retries: int = 5
base_delay: float = 1.0
max_delay: float = 60.0
jitter_factor: float = 0.5
retryable_status_codes: set = field(default_factory=lambda: {429, 500, 503, 529})
class RetryableAPIClient:
"""リトライ対応 API クライアント"""
def __init__(
self,
client: OpenAI,
config: RetryConfig = None,
on_retry: Optional[Callable] = None,
):
self.client = client
self.config = config or RetryConfig()
self.on_retry = on_retry
self.retry_count = 0
self.total_wait_time = 0
def call(self, **kwargs):
"""指数バックオフ + ジッターによるリトライ"""
retryable_errors = (RateLimitError, APIError, APITimeoutError, APIConnectionError)
for attempt in range(self.config.max_retries + 1):
try:
response = self.client.chat.completions.create(**kwargs)
if attempt > 0:
logger.info(f"リトライ成功 (試行 {attempt + 1})")
return response
except retryable_errors as e:
if attempt == self.config.max_retries:
logger.error(f"最大リトライ回数到達: {e}")
raise
# レート制限の場合、Retry-After ヘッダを尊重
retry_after = None
if hasattr(e, 'response') and e.response:
retry_after = e.response.headers.get('retry-after')
if retry_after:
wait_time = float(retry_after)
else:
# 指数バックオフ + ジッター
delay = min(
self.config.base_delay * (2 ** attempt),
self.config.max_delay,
)
jitter = random.uniform(0, delay * self.config.jitter_factor)
wait_time = delay + jitter
self.retry_count += 1
self.total_wait_time += wait_time
logger.warning(
f"リトライ {attempt + 1}/{self.config.max_retries}: "
f"{type(e).__name__}, {wait_time:.1f}秒待機"
)
if self.on_retry:
self.on_retry(attempt, e, wait_time)
time.sleep(wait_time)
except Exception as e:
# リトライ不可能なエラー(400, 401, 403等)
logger.error(f"リトライ不可能なエラー: {type(e).__name__}: {e}")
raise
def get_stats(self) -> dict:
return {
"total_retries": self.retry_count,
"total_wait_time": self.total_wait_time,
}
# 使用例
client = OpenAI(max_retries=0) # SDK のリトライは無効化
retryable = RetryableAPIClient(
client,
config=RetryConfig(max_retries=5, base_delay=1.0),
on_retry=lambda attempt, err, wait: print(f" → 待機中... ({wait:.1f}s)"),
)
response = retryable.call(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
)
print(f"リトライ統計: {retryable.get_stats()}")3.2 フォールバック戦略
import asyncio
import time
import logging
from dataclasses import dataclass, field
from litellm import acompletion
from litellm.exceptions import (
RateLimitError, ServiceUnavailableError, Timeout, APIError
)
logger = logging.getLogger(__name__)
@dataclass
class FallbackConfig:
model: str
provider: str
priority: int = 0 # 低い値が高優先
max_retries: int = 2
timeout: float = 30.0
class FallbackChain:
"""フォールバックチェーンで複数プロバイダに順に試行"""
def __init__(self, configs: list[FallbackConfig]):
self.configs = sorted(configs, key=lambda x: x.priority)
self.call_history: list[dict] = []
async def call(self, messages: list, **kwargs) -> dict:
"""フォールバックチェーンで API を呼び出す"""
errors = []
for config in self.configs:
try:
start = time.time()
response = await acompletion(
model=config.model,
messages=messages,
timeout=config.timeout,
num_retries=config.max_retries,
**kwargs,
)
latency = time.time() - start
result = {
"content": response.choices[0].message.content,
"model": config.model,
"provider": config.provider,
"latency": latency,
"usage": {
"input": response.usage.prompt_tokens,
"output": response.usage.completion_tokens,
},
"fallback_depth": len(errors),
}
self.call_history.append(result)
if errors:
logger.info(
f"フォールバック成功: {config.provider} "
f"(深度: {len(errors)})"
)
return result
except (RateLimitError, ServiceUnavailableError, Timeout, APIError) as e:
error_info = {
"provider": config.provider,
"model": config.model,
"error": str(e),
"error_type": type(e).__name__,
}
errors.append(error_info)
logger.warning(f"フォールバック: {config.provider} 失敗 - {e}")
continue
raise Exception(
f"全プロバイダが失敗:\n" +
"\n".join(f" - {e['provider']}: {e['error']}" for e in errors)
)
def get_stats(self) -> dict:
"""フォールバック統計"""
if not self.call_history:
return {"total_calls": 0}
fallback_calls = sum(1 for c in self.call_history if c["fallback_depth"] > 0)
return {
"total_calls": len(self.call_history),
"fallback_calls": fallback_calls,
"fallback_rate": fallback_calls / len(self.call_history),
"provider_distribution": {
p: sum(1 for c in self.call_history if c["provider"] == p)
for p in set(c["provider"] for c in self.call_history)
},
}
# 使用例
chain = FallbackChain([
FallbackConfig("gpt-4o", "openai", priority=0),
FallbackConfig("claude-3-5-sonnet-20241022", "anthropic", priority=1),
FallbackConfig("gemini/gemini-1.5-pro", "google", priority=2),
])
result = await chain.call([{"role": "user", "content": "Hello"}])
print(f"回答: {result['content'][:100]}...")
print(f"プロバイダ: {result['provider']}, レイテンシ: {result['latency']:.2f}s")3.3 サーキットブレーカー
import time
from enum import Enum
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
class CircuitState(Enum):
CLOSED = "CLOSED" # 正常動作中
OPEN = "OPEN" # 遮断中(全リクエスト拒否)
HALF_OPEN = "HALF_OPEN" # 試行中(1リクエストのみ許可)
@dataclass
class CircuitBreaker:
"""サーキットブレーカーパターン"""
name: str
failure_threshold: int = 5 # 連続失敗閾値
recovery_timeout: float = 60.0 # 回復待機時間 (秒)
success_threshold: int = 3 # HALF_OPEN → CLOSED に必要な連続成功数
def __post_init__(self):
self.failure_count: int = 0
self.success_count: int = 0
self.last_failure_time: float = 0
self.state: CircuitState = CircuitState.CLOSED
self.total_trips: int = 0 # OPEN になった回数
def can_proceed(self) -> bool:
"""リクエストを許可するかどうか"""
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
elapsed = time.time() - self.last_failure_time
if elapsed > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
logger.info(f"[{self.name}] OPEN → HALF_OPEN (回復試行開始)")
return True
return False
# HALF_OPEN: 1リクエストのみ許可
return True
def record_success(self):
"""成功を記録"""
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
logger.info(f"[{self.name}] HALF_OPEN → CLOSED (回復完了)")
else:
self.failure_count = 0
def record_failure(self):
"""失敗を記録"""
self.failure_count += 1
self.last_failure_time = time.time()
self.success_count = 0
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.total_trips += 1
logger.warning(f"[{self.name}] HALF_OPEN → OPEN (回復失敗)")
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.total_trips += 1
logger.warning(
f"[{self.name}] CLOSED → OPEN "
f"(連続失敗: {self.failure_count})"
)
def get_status(self) -> dict:
return {
"name": self.name,
"state": self.state.value,
"failure_count": self.failure_count,
"total_trips": self.total_trips,
"time_since_last_failure": (
time.time() - self.last_failure_time
if self.last_failure_time > 0 else None
),
}
# プロバイダごとにサーキットブレーカーを管理
class CircuitBreakerManager:
"""複数のサーキットブレーカーを統合管理"""
def __init__(self):
self.breakers: dict[str, CircuitBreaker] = {}
def get_or_create(self, name: str, **kwargs) -> CircuitBreaker:
if name not in self.breakers:
self.breakers[name] = CircuitBreaker(name=name, **kwargs)
return self.breakers[name]
def get_available_providers(self) -> list[str]:
"""利用可能なプロバイダのリスト"""
return [
name for name, breaker in self.breakers.items()
if breaker.can_proceed()
]
def get_all_status(self) -> list[dict]:
return [b.get_status() for b in self.breakers.values()]
# 使用例
manager = CircuitBreakerManager()
for provider in ["openai", "anthropic", "google"]:
manager.get_or_create(provider, failure_threshold=5, recovery_timeout=60)
# 利用可能なプロバイダを確認
available = manager.get_available_providers()
print(f"利用可能: {available}")4. レート制限管理
4.1 トークンバケット
import asyncio
import time
class TokenBucket:
"""トークンバケットによるレート制限"""
def __init__(self, rate: float, capacity: int):
self.rate = rate # 秒あたりのトークン補充速度
self.capacity = capacity # バケット容量
self.tokens = capacity
self.last_time = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> float:
"""トークンを取得(必要に応じて待機)。待機時間を返す。"""
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_time
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_time = now
if self.tokens >= tokens:
self.tokens -= tokens
return 0.0
# トークン不足: 必要な待ち時間を計算
wait = (tokens - self.tokens) / self.rate
await asyncio.sleep(wait)
self.tokens = 0
return wait
def available(self) -> float:
"""現在利用可能なトークン数"""
now = time.monotonic()
elapsed = now - self.last_time
return min(self.capacity, self.tokens + elapsed * self.rate)
class RateLimitManager:
"""RPM と TPM の両方を管理するレートリミッター"""
def __init__(self, rpm: int, tpm: int):
self.rpm_limiter = TokenBucket(rate=rpm / 60, capacity=rpm)
self.tpm_limiter = TokenBucket(rate=tpm / 60, capacity=tpm)
self.total_wait_time = 0
self.total_requests = 0
async def acquire(self, estimated_tokens: int = 500):
"""リクエスト送信前にレート制限をチェック"""
rpm_wait = await self.rpm_limiter.acquire(1)
tpm_wait = await self.tpm_limiter.acquire(estimated_tokens)
total_wait = rpm_wait + tpm_wait
self.total_wait_time += total_wait
self.total_requests += 1
if total_wait > 0:
logging.debug(f"レート制限待機: {total_wait:.2f}s")
def get_stats(self) -> dict:
return {
"total_requests": self.total_requests,
"total_wait_time": f"{self.total_wait_time:.1f}s",
"avg_wait_per_request": (
f"{self.total_wait_time / self.total_requests:.3f}s"
if self.total_requests > 0 else "N/A"
),
}
# OpenAI の Tier 別レート制限
OPENAI_RATE_LIMITS = {
"tier1": {"rpm": 500, "tpm": 200_000},
"tier2": {"rpm": 5_000, "tpm": 2_000_000},
"tier3": {"rpm": 5_000, "tpm": 5_000_000},
"tier4": {"rpm": 10_000, "tpm": 10_000_000},
"tier5": {"rpm": 10_000, "tpm": 30_000_000},
}
# 使用例
limiter = RateLimitManager(rpm=500, tpm=200_000) # Tier 1
async def rate_limited_call(messages: list) -> str:
await limiter.acquire(estimated_tokens=700) # 入力+出力の概算
response = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
)
return response.choices[0].message.content4.2 並行リクエスト制御
import asyncio
from openai import AsyncOpenAI
class ConcurrencyController:
"""並行リクエスト数を制御するセマフォベースのコントローラー"""
def __init__(
self,
max_concurrent: int = 10,
rate_limiter: RateLimitManager = None,
):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.rate_limiter = rate_limiter
self.active_requests = 0
self.completed_requests = 0
self.failed_requests = 0
async def execute(self, coro):
"""並行数制限付きで非同期タスクを実行"""
async with self.semaphore:
self.active_requests += 1
try:
if self.rate_limiter:
await self.rate_limiter.acquire()
result = await coro
self.completed_requests += 1
return result
except Exception as e:
self.failed_requests += 1
raise
finally:
self.active_requests -= 1
async def execute_batch(
self,
tasks: list,
on_progress: callable = None,
) -> list:
"""バッチタスクを並行数制限付きで実行"""
results = []
async def task_wrapper(i, task):
result = await self.execute(task)
if on_progress:
on_progress(i, len(tasks))
return result
results = await asyncio.gather(
*[task_wrapper(i, task) for i, task in enumerate(tasks)],
return_exceptions=True,
)
return results
# 使用例: 100件のリクエストを並行10で処理
client = AsyncOpenAI()
controller = ConcurrencyController(
max_concurrent=10,
rate_limiter=RateLimitManager(rpm=500, tpm=200_000),
)
prompts = [f"質問 {i}: ..." for i in range(100)]
tasks = [
client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": p}],
)
for p in prompts
]
results = await controller.execute_batch(
tasks,
on_progress=lambda i, total: print(f"\r進捗: {i+1}/{total}", end=""),
)
print(f"\n完了: {controller.completed_requests}, 失敗: {controller.failed_requests}")5. コスト管理
5.1 使用量トラッキング
from dataclasses import dataclass, field
from datetime import datetime, date
from collections import defaultdict
import json
import logging
logger = logging.getLogger(__name__)
@dataclass
class UsageRecord:
timestamp: str
model: str
input_tokens: int
output_tokens: int
cost: float
request_id: str = ""
metadata: dict = field(default_factory=dict)
class BudgetExceededError(Exception):
pass
class UsageTracker:
"""API使用量・コスト追跡"""
PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"o1": {"input": 15.00, "output": 60.00},
"o3-mini": {"input": 1.10, "output": 4.40},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
"claude-3-5-haiku": {"input": 0.80, "output": 4.00},
"gemini-1.5-pro": {"input": 1.25, "output": 5.00},
"gemini-1.5-flash": {"input": 0.075, "output": 0.30},
"gemini-2.0-flash": {"input": 0.10, "output": 0.40},
"deepseek-v3": {"input": 0.27, "output": 1.10},
}
def __init__(
self,
daily_budget: float = 100.0,
monthly_budget: float = 3000.0,
alert_threshold: float = 0.8, # 予算の80%で警告
):
self.daily_budget = daily_budget
self.monthly_budget = monthly_budget
self.alert_threshold = alert_threshold
self.records: list[UsageRecord] = []
def record(
self,
model: str,
input_tokens: int,
output_tokens: int,
request_id: str = "",
metadata: dict = None,
) -> float:
"""使用量を記録してコストを返す"""
# モデル名の正規化
model_key = self._normalize_model_name(model)
prices = self.PRICING.get(model_key, {"input": 0, "output": 0})
cost = (
(input_tokens / 1_000_000) * prices["input"] +
(output_tokens / 1_000_000) * prices["output"]
)
record = UsageRecord(
timestamp=datetime.now().isoformat(),
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost,
request_id=request_id,
metadata=metadata or {},
)
self.records.append(record)
# 予算チェック
self._check_budget()
return cost
def _normalize_model_name(self, model: str) -> str:
"""モデル名を料金表のキーに正規化"""
model = model.lower()
for key in self.PRICING:
if key in model.replace(".", "-"):
return key
return model
def _check_budget(self):
"""予算超過チェック"""
today_cost = self.get_today_cost()
month_cost = self.get_month_cost()
# 日次予算チェック
if today_cost > self.daily_budget:
raise BudgetExceededError(
f"日次予算超過: ${today_cost:.2f} / ${self.daily_budget:.2f}"
)
# 月次予算チェック
if month_cost > self.monthly_budget:
raise BudgetExceededError(
f"月次予算超過: ${month_cost:.2f} / ${self.monthly_budget:.2f}"
)
# 警告
if today_cost > self.daily_budget * self.alert_threshold:
logger.warning(
f"日次予算警告: ${today_cost:.2f} / ${self.daily_budget:.2f} "
f"({today_cost/self.daily_budget:.0%})"
)
def get_today_cost(self) -> float:
today = date.today().isoformat()
return sum(r.cost for r in self.records if r.timestamp.startswith(today))
def get_month_cost(self) -> float:
month = date.today().strftime("%Y-%m")
return sum(r.cost for r in self.records if r.timestamp.startswith(month))
def get_report(self) -> dict:
"""使用量レポートを生成"""
today = date.today().isoformat()
month = date.today().strftime("%Y-%m")
# モデル別集計
model_costs = defaultdict(float)
model_tokens = defaultdict(lambda: {"input": 0, "output": 0})
for r in self.records:
if r.timestamp.startswith(month):
model_costs[r.model] += r.cost
model_tokens[r.model]["input"] += r.input_tokens
model_tokens[r.model]["output"] += r.output_tokens
return {
"today": {
"cost": self.get_today_cost(),
"budget": self.daily_budget,
"utilization": f"{self.get_today_cost()/self.daily_budget:.0%}",
},
"month": {
"cost": self.get_month_cost(),
"budget": self.monthly_budget,
"utilization": f"{self.get_month_cost()/self.monthly_budget:.0%}",
},
"by_model": {
model: {
"cost": f"${cost:.2f}",
"input_tokens": model_tokens[model]["input"],
"output_tokens": model_tokens[model]["output"],
}
for model, cost in sorted(
model_costs.items(), key=lambda x: -x[1]
)
},
"total_requests": len(self.records),
}
# 使用例
tracker = UsageTracker(daily_budget=100.0, monthly_budget=3000.0)
# API 呼び出し後にトラッキング
cost = tracker.record("gpt-4o", input_tokens=500, output_tokens=200)
print(f"今回のコスト: ${cost:.6f}")
report = tracker.get_report()
print(json.dumps(report, indent=2, ensure_ascii=False))5.2 プロンプトキャッシュ戦略
class PromptCacheStrategy:
"""プロンプトキャッシュ最適化戦略"""
def __init__(self):
self.cache_hits = 0
self.cache_misses = 0
@staticmethod
def design_cacheable_prompt(
system_prompt: str,
few_shot_examples: list[dict],
user_query: str,
) -> list[dict]:
"""キャッシュ効率の高いプロンプト構造を設計
キャッシュのポイント:
- system prompt と few-shot examples は先頭に配置(キャッシュ対象)
- ユーザークエリは末尾に配置(変動部分)
- Anthropic: cache_control で明示的にキャッシュ
- OpenAI: 自動キャッシュ(先頭1024トークン以上が同一なら適用)
"""
# Anthropic 向け: 明示的キャッシュ制御
system_with_cache = [
{
"type": "text",
"text": system_prompt,
"cache_control": {"type": "ephemeral"},
},
]
# Few-shot examples もキャッシュ対象に
messages = []
for i, example in enumerate(few_shot_examples):
messages.append({"role": "user", "content": example["input"]})
assistant_content = example["output"]
# 最後の few-shot にキャッシュポイントを設定
if i == len(few_shot_examples) - 1:
messages.append({
"role": "assistant",
"content": [
{
"type": "text",
"text": assistant_content,
"cache_control": {"type": "ephemeral"},
},
],
})
else:
messages.append({"role": "assistant", "content": assistant_content})
# ユーザークエリ(変動部分)
messages.append({"role": "user", "content": user_query})
return system_with_cache, messages
@staticmethod
def estimate_cache_savings(
total_input_tokens: int,
cacheable_tokens: int,
requests_per_day: int,
cache_hit_rate: float = 0.8,
model: str = "claude-3-5-sonnet",
) -> dict:
"""キャッシュによるコスト削減を推定"""
pricing = {
"claude-3-5-sonnet": {"normal": 3.00, "cached": 0.30, "write": 3.75},
"gpt-4o": {"normal": 2.50, "cached": 1.25, "write": 2.50},
}
p = pricing.get(model, pricing["claude-3-5-sonnet"])
# キャッシュなしのコスト
daily_tokens = total_input_tokens * requests_per_day
cost_without_cache = (daily_tokens / 1_000_000) * p["normal"]
# キャッシュありのコスト
cached_tokens = cacheable_tokens * requests_per_day * cache_hit_rate
uncached_tokens = daily_tokens - cached_tokens
cache_write_tokens = cacheable_tokens * requests_per_day * (1 - cache_hit_rate)
cost_with_cache = (
(uncached_tokens / 1_000_000) * p["normal"] +
(cached_tokens / 1_000_000) * p["cached"] +
(cache_write_tokens / 1_000_000) * p["write"]
)
daily_savings = cost_without_cache - cost_with_cache
monthly_savings = daily_savings * 30
return {
"daily_cost_without_cache": f"${cost_without_cache:.2f}",
"daily_cost_with_cache": f"${cost_with_cache:.2f}",
"daily_savings": f"${daily_savings:.2f}",
"monthly_savings": f"${monthly_savings:.2f}",
"savings_rate": f"{daily_savings/cost_without_cache:.0%}",
}
# 使用例
savings = PromptCacheStrategy.estimate_cache_savings(
total_input_tokens=2000,
cacheable_tokens=1500, # system + few-shot
requests_per_day=10_000,
cache_hit_rate=0.85,
model="claude-3-5-sonnet",
)
print(f"月間節約額: {savings['monthly_savings']}")
print(f"削減率: {savings['savings_rate']}")6. セキュリティとオブザーバビリティ
6.1 API キー管理
import os
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class SecureAPIKeyManager:
"""安全な API キー管理"""
def __init__(self):
self._keys: dict[str, str] = {}
def get_key(self, provider: str) -> str:
"""API キーを安全に取得"""
# 1. メモリキャッシュをチェック
if provider in self._keys:
return self._keys[provider]
# 2. 環境変数から取得
env_map = {
"openai": "OPENAI_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"google": "GOOGLE_API_KEY",
"deepseek": "DEEPSEEK_API_KEY",
}
env_var = env_map.get(provider)
if env_var:
key = os.environ.get(env_var)
if key:
self._keys[provider] = key
return key
# 3. AWS Secrets Manager から取得(本番環境向け)
key = self._get_from_secrets_manager(provider)
if key:
self._keys[provider] = key
return key
raise ValueError(f"APIキーが見つかりません: {provider}")
def _get_from_secrets_manager(self, provider: str) -> Optional[str]:
"""AWS Secrets Manager からキーを取得"""
try:
import boto3
client = boto3.client("secretsmanager")
response = client.get_secret_value(
SecretId=f"llm-api-keys/{provider}",
)
return response["SecretString"]
except Exception:
return None
@staticmethod
def validate_key_format(provider: str, key: str) -> bool:
"""API キーの形式を検証"""
patterns = {
"openai": lambda k: k.startswith("sk-") and len(k) > 20,
"anthropic": lambda k: k.startswith("sk-ant-") and len(k) > 20,
"google": lambda k: k.startswith("AIza") and len(k) > 20,
}
validator = patterns.get(provider, lambda k: len(k) > 10)
return validator(key)6.2 リクエスト/レスポンスのログ
import json
import hashlib
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime
@dataclass
class LLMRequestLog:
"""LLM リクエスト/レスポンスのログ"""
request_id: str
timestamp: str
model: str
provider: str
input_tokens: int
output_tokens: int
latency_ms: float
ttft_ms: float = 0
status: str = "success"
error: str = ""
cost: float = 0.0
prompt_hash: str = "" # プロンプト内容のハッシュ(PII保護)
metadata: dict = field(default_factory=dict)
class LLMLogger:
"""LLM API 呼び出しのログ管理"""
def __init__(self, log_prompts: bool = False):
"""
Args:
log_prompts: True の場合、プロンプト内容もログに記録
(PII が含まれる場合は False 推奨)
"""
self.log_prompts = log_prompts
self.logs: list[LLMRequestLog] = []
self.logger = logging.getLogger("llm_logger")
def log_request(
self,
request_id: str,
model: str,
provider: str,
input_tokens: int,
output_tokens: int,
latency_ms: float,
ttft_ms: float = 0,
status: str = "success",
error: str = "",
cost: float = 0.0,
prompt: str = "",
metadata: dict = None,
):
"""リクエストをログに記録"""
# プロンプトのハッシュ化(内容は保存しない)
prompt_hash = hashlib.sha256(prompt.encode()).hexdigest()[:16] if prompt else ""
log = LLMRequestLog(
request_id=request_id,
timestamp=datetime.now().isoformat(),
model=model,
provider=provider,
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=latency_ms,
ttft_ms=ttft_ms,
status=status,
error=error,
cost=cost,
prompt_hash=prompt_hash,
metadata=metadata or {},
)
self.logs.append(log)
self.logger.info(json.dumps(asdict(log)))
def get_metrics(self) -> dict:
"""メトリクスサマリーを取得"""
if not self.logs:
return {"total_requests": 0}
success_logs = [l for l in self.logs if l.status == "success"]
error_logs = [l for l in self.logs if l.status != "success"]
return {
"total_requests": len(self.logs),
"success_rate": len(success_logs) / len(self.logs),
"error_rate": len(error_logs) / len(self.logs),
"avg_latency_ms": (
sum(l.latency_ms for l in success_logs) / len(success_logs)
if success_logs else 0
),
"p95_latency_ms": (
sorted([l.latency_ms for l in success_logs])[int(len(success_logs) * 0.95)]
if success_logs else 0
),
"avg_ttft_ms": (
sum(l.ttft_ms for l in success_logs) / len(success_logs)
if success_logs else 0
),
"total_cost": sum(l.cost for l in self.logs),
"total_tokens": sum(l.input_tokens + l.output_tokens for l in self.logs),
"errors_by_type": {
error: sum(1 for l in error_logs if l.error == error)
for error in set(l.error for l in error_logs)
},
}7. 比較表
7.1 SDK 機能比較
| 機能 | OpenAI SDK | Anthropic SDK | Google SDK | LiteLLM |
|---|---|---|---|---|
| 同期/非同期 | 両方 | 両方 | 同期中心 | 両方 |
| ストリーミング | 対応 | 対応 | 対応 | 対応 |
| 自動リトライ | 対応 (設定可) | 対応 | 限定的 | 対応 |
| 型安全性 | Pydantic | Pydantic | protobuf | 基本型 |
| マルチプロバイダ | N/A | N/A | N/A | 100+対応 |
| コスト追跡 | usage対応 | usage対応 | 限定的 | 統合対応 |
| Structured Output | 対応 | N/A | 対応 | プロバイダ依存 |
| プロンプトキャッシュ | 自動 | 明示的 | N/A | プロバイダ依存 |
| バッチAPI | 対応 | 対応 | N/A | プロバイダ依存 |
| Extended Thinking | N/A | 対応 | N/A | 対応 |
7.2 ストリーミング方式比較
| 方式 | レイテンシ | 実装複雑度 | ブラウザ対応 | 用途 |
|---|---|---|---|---|
| SSE | 低 | 低 | ネイティブ | チャットUI |
| WebSocket | 最低 | 高 | ネイティブ | リアルタイム双方向 |
| Long Polling | 中 | 低 | ネイティブ | レガシー対応 |
| gRPC Stream | 最低 | 高 | 間接的 | マイクロサービス |
8. アンチパターン
アンチパターン 1: リトライなしの本番コード
# NG: リトライなし — 一時的なエラーで即失敗
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
)
# → 429 (レート制限) や 500 (サーバーエラー) で即クラッシュ
# OK: SDK 組み込みリトライ + カスタムリトライ + フォールバック
client = OpenAI(
max_retries=3, # SDK レベルのリトライ
timeout=30.0,
)
# + アプリレベルのフォールバック
response = await chain.call(messages)アンチパターン 2: API キーのハードコード
# NG: ソースコードにキーを直書き
client = OpenAI(api_key="sk-abc123...") # セキュリティリスク大
# NG: .env ファイルを Git にコミット
# .gitignore に .env を追加していない
# OK: 環境変数で管理
import os
client = OpenAI() # 自動的に OPENAI_API_KEY 環境変数を使用
# OK: シークレットマネージャー (本番環境)
key_manager = SecureAPIKeyManager()
client = OpenAI(api_key=key_manager.get_key("openai"))アンチパターン 3: レート制限を無視した並行処理
# NG: 制限なしの並行リクエスト
tasks = [call_api(prompt) for prompt in prompts] # 1000件同時
results = await asyncio.gather(*tasks)
# → 大量の 429 エラー → 全リクエスト失敗
# OK: セマフォ + レート制限で制御
controller = ConcurrencyController(
max_concurrent=10,
rate_limiter=RateLimitManager(rpm=500, tpm=200_000),
)
results = await controller.execute_batch(tasks)アンチパターン 4: ストリーミングなしの長時間応答
# NG: 長い応答を非ストリーミングで待つ
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
max_tokens=4096,
)
# → ユーザーは10-30秒間何も表示されず待つ → 離脱
# OK: ストリーミングで即座にフィードバック
stream = client.chat.completions.create(
model="gpt-4o",
messages=messages,
max_tokens=4096,
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content # 即座に表示アンチパターン 5: コスト管理なしの本番運用
# NG: 予算管理なし
# → 異常なトラフィックや無限ループで数万ドルの請求
# OK: 多層防御のコスト管理
tracker = UsageTracker(
daily_budget=100.0,
monthly_budget=3000.0,
alert_threshold=0.8,
)
# API 呼び出しごとに追跡
cost = tracker.record(model, input_tokens, output_tokens)
# → 予算超過時は BudgetExceededError を送出9. FAQ
Q1: 同期と非同期のどちらを使うべき?
Web アプリ (FastAPI等) では非同期が推奨。同時リクエスト処理が効率的になる。
バッチ処理やスクリプトでは同期で十分。
asyncio.gather で複数の LLM 呼び出しを並列化できるのが非同期の最大のメリット。
判断基準:
- FastAPI/Starlette → 非同期必須
- Django (ASGI) → 非同期推奨
- Django (WSGI) → 同期
- CLI ツール → 同期で十分
- バッチ処理 (大量リクエスト) → 非同期 + セマフォ
Q2: ストリーミングの TTFT を改善するには?
プロンプトを短くする (入力トークン数の削減)。 Flash/mini 系の高速モデルを使用する。 CDN 経由ではなく直接 API エンドポイントに接続する。 System Prompt をキャッシュ可能にする (OpenAI, Anthropic の Prompt Caching)。 リージョンを最寄りに設定する。
Q3: 複数プロバイダを使い分けるベストプラクティスは?
LiteLLM や OpenRouter で抽象化し、環境変数でモデルを切り替え可能にする。 プロバイダごとのサーキットブレーカーを設置し、障害時は自動フォールバック。 コスト最適化のために、タスク難易度に応じてモデルをルーティングする。
Q4: プロンプトキャッシュはどの程度コスト削減に効果的?
Anthropic の場合、キャッシュヒット時の入力トークン価格は通常の 1/10 (Claude 3.5 Sonnet: $3.00 → $0.30)。 システムプロンプトや Few-shot 例が固定の場合、入力コストの 50-80% を削減可能。 OpenAI は 1024 トークン以上の共通プレフィックスで自動キャッシュ(価格 50% 割引)。
Q5: バッチ API を使うべき場面は?
リアルタイム性が不要な大量処理に最適:
- データの一括分類・タグ付け
- 大量のメール/文書の要約
- テスト・評価の実行
- コンテンツ生成のバッチ処理
メリット: 50% のコスト削減、高いスループット デメリット: 結果取得まで最大24時間、リアルタイム処理には不向き
FAQ
Q1: このトピックを学ぶ上で最も重要なポイントは何ですか?
実践的な経験を積むことが最も重要です。理論だけでなく、実際にコードを書いて動作を確認することで理解が深まります。
Q2: 初心者がよく陥る間違いは何ですか?
基礎を飛ばして応用に進むことです。このガイドで説明している基本概念をしっかり理解してから、次のステップに進むことをお勧めします。
Q3: 実務ではどのように活用されていますか?
このトピックの知識は、日常的な開発業務で頻繁に活用されます。特にコードレビューやアーキテクチャ設計の際に重要になります。
まとめ
| 項目 | 推奨 |
|---|---|
| SDK | LiteLLM (マルチプロバイダ) + 個別 SDK |
| ストリーミング | SSE (Web) / WebSocket (リアルタイム) |
| リトライ | 指数バックオフ + ジッター (最大5回) |
| フォールバック | 3プロバイダチェーン |
| レート制限 | トークンバケット + セマフォ |
| コスト管理 | 日次/月次予算 + 使用量トラッキング |
| API キー管理 | 環境変数 + シークレットマネージャー |
| キャッシュ | プロンプトキャッシュ積極活用 |
| 監視 | リクエストログ + メトリクス収集 |
次に読むべきガイド
- 01-vector-databases.md — ベクトル DB との統合
- 02-local-llm.md — ローカル LLM のデプロイ
- ../02-applications/02-function-calling.md — Function Calling の統合
参考文献
- OpenAI, "API Reference," https://platform.openai.com/docs/api-reference
- Anthropic, "API Reference," https://docs.anthropic.com/claude/reference
- Anthropic, "Prompt Caching," https://docs.anthropic.com/claude/docs/prompt-caching
- LiteLLM, "Documentation," https://docs.litellm.ai/
- Google, "Generative AI API," https://ai.google.dev/api
- OpenAI, "Batch API," https://platform.openai.com/docs/guides/batch
- Anthropic, "Message Batches," https://docs.anthropic.com/claude/docs/message-batches