自律エージェント
計画・実行・振り返り――長時間にわたり自律的にタスクを遂行するエージェントの設計パターン。目標分解、自己評価、適応的再計画の仕組みを解説する。
128 分で読めます63,546 文字
自律エージェント
計画・実行・振り返り――長時間にわたり自律的にタスクを遂行するエージェントの設計パターン。目標分解、自己評価、適応的再計画の仕組みを解説する。
この章で学ぶこと
- 自律エージェントの計画-実行-振り返りサイクルの設計
- 目標分解とサブゴール管理の実装パターン
- 自律性の段階とヒューマン・イン・ザ・ループの組み込み方
- メモリシステム(短期・長期・エピソード記憶)の設計
- 安全性ガードレールと迷走検出の実装
- 本番運用のためのモニタリング・コスト管理・テスト手法
前提知識
このガイドを読む前に、以下の知識があると理解が深まります:
- 基本的なプログラミングの知識
- 関連する基礎概念の理解
- ワークフローエージェント の内容を理解していること
1. 自律エージェントの定義
自律エージェントの特徴
通常のエージェント:
ユーザー: "Xを調べて"
→ 検索→回答(数ステップで完了)
自律エージェント:
ユーザー: "競合分析レポートを作成して"
→ 計画立案
→ 情報収集(複数ソース)
→ データ分析
→ レポート草案
→ 自己レビュー
→ 修正
→ 最終レポート
(数十〜数百ステップ、数分〜数時間)
1.1 自律性のレベル
自律性の5段階
Level 0: 手動 ユーザーが全ステップ指示
Level 1: アシスト 1ステップをLLMが実行
Level 2: 半自律 複数ステップを実行、要所で確認
Level 3: 条件付き自律 ほぼ自律、重要判断のみ人間が承認
Level 4: 完全自律 目標だけ与えれば完了まで自走
(例: Devin, Claude Code)
1.2 自律エージェントを選ぶべきか?
意思決定フローチャート
Q1: タスクが明確に定義されており、手順が固定か?
├─ YES → ワークフローエージェント(02-workflow-agents.md)
└─ NO → Q2へ
Q2: タスク完了に10ステップ以上必要か?
├─ YES → Q3へ
└─ NO → シングルエージェント(00-single-agent.md)
Q3: 実行中に状況に応じた判断の変更が必要か?
├─ YES → 自律エージェント(この章)
└─ NO → ワークフローエージェント
Q4: 失敗時に自己修正が必要か?
├─ YES → 自律エージェント + Reflexionパターン
└─ NO → マルチエージェント委譲パターン
Q5: セキュリティ的に許容できる操作範囲は?
├─ 制限あり → Level 2-3(ヒューマン・イン・ザ・ループ必須)
└─ 制限なし → Level 4(フルガードレール必須)
1.3 典型的なユースケース
| ユースケース | 自律性レベル | ステップ数 | 所要時間 |
|---|---|---|---|
| コード生成+テスト | L3 | 20-50 | 5-15分 |
| 競合分析レポート作成 | L3 | 30-100 | 10-30分 |
| バグ調査+修正 | L3-L4 | 10-40 | 3-20分 |
| プロジェクト初期構築 | L3 | 50-200 | 15-60分 |
| データ分析+可視化 | L2-L3 | 15-40 | 5-20分 |
| 文書翻訳+ローカライズ | L2 | 10-30 | 3-10分 |
| インフラ構築+デプロイ | L3(承認付き) | 30-80 | 10-40分 |
2. 計画-実行-振り返りサイクル
2.1 コアアーキテクチャ
自律エージェントのコアループ
+--------+
| 目標 |
+---+----+
|
v
+-------+-------+
| 計画 (Plan) |←────────────+
+-------+-------+ |
| |
v |
+-------+-------+ +------+------+
| 実行 (Act) |----->| 振り返り |
+-------+-------+ | (Reflect) |
| +------+------+
v |
成功? ─── YES ──→ 完了 |
| |
NO ──→ 再計画 ────────+
2.2 完全な実装
# 自律エージェントの完全な実装
import anthropic
import json
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional
import logging
logger = logging.getLogger(__name__)
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
BLOCKED = "blocked"
SKIPPED = "skipped"
@dataclass
class SubTask:
id: int
description: str
status: TaskStatus = TaskStatus.PENDING
result: str = ""
attempts: int = 0
max_attempts: int = 3
dependencies: list[int] = field(default_factory=list)
priority: int = 0 # 0が最高
@property
def can_execute(self) -> bool:
return self.status == TaskStatus.PENDING and self.attempts < self.max_attempts
@dataclass
class ExecutionTrace:
"""実行トレースの記録"""
step: int
task_id: int
task_description: str
action: str
result: str
reflection: dict
tokens_used: int
duration: float
timestamp: float
class AutonomousAgent:
def __init__(
self,
tools: list,
max_steps: int = 50,
max_cost: float = 5.0,
timeout: float = 3600,
model: str = "claude-sonnet-4-20250514"
):
self.client = anthropic.Anthropic()
self.tools = tools
self.max_steps = max_steps
self.max_cost = max_cost
self.timeout = timeout
self.model = model
self.plan: list[SubTask] = []
self.completed_work: list[dict] = []
self.reflections: list[dict] = []
self.traces: list[ExecutionTrace] = []
self.total_tokens = 0
self.total_cost = 0.0
self.start_time = 0.0
self.agent_id = str(uuid.uuid4())[:8]
def run(self, goal: str) -> str:
"""目標を受け取り、自律的に完了まで実行"""
self.start_time = time.time()
logger.info(f"[{self.agent_id}] 目標: {goal}")
# Phase 1: 計画
self.plan = self._create_plan(goal)
logger.info(f"[{self.agent_id}] 計画: {len(self.plan)} サブタスク")
for step in range(self.max_steps):
# ガードレールチェック
if self._should_stop():
logger.warning(f"[{self.agent_id}] ガードレールにより停止")
break
# Phase 2: 次のサブタスクを選択・実行
next_task = self._select_next_task()
if next_task is None:
break # 全タスク完了
logger.info(f"[{self.agent_id}] Step {step}: {next_task.description}")
step_start = time.time()
result = self._execute_task(next_task)
# Phase 3: 振り返り
reflection = self._reflect(goal, next_task, result)
self.reflections.append(reflection)
# トレース記録
self.traces.append(ExecutionTrace(
step=step,
task_id=next_task.id,
task_description=next_task.description,
action="execute",
result=result[:500],
reflection=reflection,
tokens_used=self.total_tokens,
duration=time.time() - step_start,
timestamp=time.time()
))
# 必要なら再計画
if reflection.get("needs_replan"):
logger.info(f"[{self.agent_id}] 再計画: {reflection['reason']}")
self.plan = self._replan(goal, reflection["reason"])
# 最終まとめ
return self._synthesize(goal)
def _should_stop(self) -> bool:
"""ガードレールチェック"""
# タイムアウト
if time.time() - self.start_time > self.timeout:
logger.warning("タイムアウト")
return True
# コスト上限
if self.total_cost > self.max_cost:
logger.warning(f"コスト上限超過: ${self.total_cost:.2f}")
return True
# 迷走検出
if self._detect_wandering():
logger.warning("迷走検出")
return True
return False
def _detect_wandering(self) -> bool:
"""エージェントの迷走を検出"""
if len(self.traces) < 5:
return False
recent = self.traces[-5:]
# 同じタスクへの連続失敗
task_ids = [t.task_id for t in recent]
if len(set(task_ids)) == 1:
task = next((t for t in self.plan if t.id == task_ids[0]), None)
if task and task.status != TaskStatus.COMPLETED:
return True
# 全ての最近の振り返りがpoor評価
all_poor = all(
t.reflection.get("quality") == "poor"
for t in recent
)
if all_poor:
return True
return False
def _create_plan(self, goal: str) -> list[SubTask]:
"""目標をサブタスクに分解"""
response = self.client.messages.create(
model=self.model,
max_tokens=2048,
messages=[{"role": "user", "content": f"""
目標: {goal}
この目標を達成するためのサブタスクを JSON 配列で出力してください。
各サブタスクは独立して実行可能で、依存関係がある場合は順序で表現。
形式:
[
{{"id": 1, "description": "...", "dependencies": [], "priority": 0}},
{{"id": 2, "description": "...", "dependencies": [1], "priority": 1}}
]
JSONのみ出力してください。
"""}]
)
self.total_tokens += response.usage.input_tokens + response.usage.output_tokens
self._update_cost(response.usage)
tasks_data = json.loads(response.content[0].text)
return [
SubTask(
id=t["id"],
description=t["description"],
dependencies=t.get("dependencies", []),
priority=t.get("priority", 0)
)
for t in tasks_data
]
def _select_next_task(self) -> Optional[SubTask]:
"""次に実行すべきサブタスクを選択(依存関係考慮)"""
completed_ids = {
t.id for t in self.plan if t.status == TaskStatus.COMPLETED
}
candidates = [
t for t in self.plan
if t.can_execute
and all(dep in completed_ids for dep in t.dependencies)
]
if not candidates:
return None
# 優先度順にソート
candidates.sort(key=lambda t: t.priority)
selected = candidates[0]
selected.status = TaskStatus.IN_PROGRESS
selected.attempts += 1
return selected
def _execute_task(self, task: SubTask) -> str:
"""サブタスクを実行(ツール使用あり)"""
context = ""
if self.completed_work:
recent_work = self.completed_work[-3:]
context = f"\nこれまでの成果:\n{json.dumps(recent_work, ensure_ascii=False, indent=2)}"
messages = [{"role": "user", "content": f"""
サブタスク: {task.description}
{context}
このサブタスクを完了してください。
"""}]
# エージェントループ(最大10ステップ)
for _ in range(10):
response = self.client.messages.create(
model=self.model,
max_tokens=4096,
tools=self.tools,
messages=messages
)
self.total_tokens += response.usage.input_tokens + response.usage.output_tokens
self._update_cost(response.usage)
if response.stop_reason == "end_turn":
result = response.content[0].text
task.status = TaskStatus.COMPLETED
task.result = result
self.completed_work.append({
"task_id": task.id,
"task": task.description,
"result": result
})
return result
# ツール呼び出し処理
tool_results = self._handle_tool_calls(response)
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
task.status = TaskStatus.FAILED
return "タスクが最大ステップ内で完了できませんでした"
def _handle_tool_calls(self, response) -> list[dict]:
"""ツール呼び出しを処理"""
results = []
for block in response.content:
if block.type == "tool_use":
tool_result = self._run_tool(block.name, block.input)
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(tool_result)
})
return results
def _run_tool(self, name: str, input_data: dict) -> Any:
"""ツールを実行"""
for tool in self.tools:
if tool.get("name") == name:
# 実際のツール実行ロジック
logger.info(f"ツール実行: {name}({json.dumps(input_data, ensure_ascii=False)[:100]})")
return f"ツール {name} の実行結果"
return f"不明なツール: {name}"
def _reflect(self, goal: str, task: SubTask, result: str) -> dict:
"""実行結果を振り返り、評価する"""
response = self.client.messages.create(
model=self.model,
max_tokens=1024,
messages=[{"role": "user", "content": f"""
全体目標: {goal}
完了したタスク: {task.description}
結果: {result[:1000]}
残りのタスク: {[t.description for t in self.plan if t.status == TaskStatus.PENDING]}
これまでの振り返り: {json.dumps(self.reflections[-3:], ensure_ascii=False) if self.reflections else "なし"}
以下をJSON形式で評価してください:
{{
"quality": "good" / "acceptable" / "poor",
"needs_replan": true/false,
"reason": "再計画が必要な理由(不要ならnull)",
"learning": "この経験から学んだこと",
"confidence": 0.0-1.0
}}
JSONのみ出力してください。
"""}]
)
self.total_tokens += response.usage.input_tokens + response.usage.output_tokens
self._update_cost(response.usage)
try:
return json.loads(response.content[0].text)
except json.JSONDecodeError:
return {
"quality": "acceptable",
"needs_replan": False,
"reason": None,
"learning": "振り返りの解析に失敗",
"confidence": 0.5
}
def _replan(self, goal: str, reason: str) -> list[SubTask]:
"""失敗理由を考慮して計画を再作成"""
completed = [t for t in self.plan if t.status == TaskStatus.COMPLETED]
failed = [t for t in self.plan if t.status == TaskStatus.FAILED]
response = self.client.messages.create(
model=self.model,
max_tokens=2048,
messages=[{"role": "user", "content": f"""
目標: {goal}
現在の状態:
- 完了済み: {[t.description for t in completed]}
- 失敗: {[t.description for t in failed]}
- 再計画理由: {reason}
- これまでの学び: {[r.get('learning', '') for r in self.reflections[-5:]]}
完了済みのタスクはそのまま保持し、残りのタスクを再計画してください。
以前と同じアプローチは避け、代替手段を検討してください。
JSON配列で出力(完了済みは含めない):
[{{"id": N, "description": "...", "dependencies": [], "priority": 0}}]
"""}]
)
self.total_tokens += response.usage.input_tokens + response.usage.output_tokens
self._update_cost(response.usage)
# 完了済みタスクを保持
new_tasks_data = json.loads(response.content[0].text)
new_tasks = [
SubTask(
id=t["id"],
description=t["description"],
dependencies=t.get("dependencies", []),
priority=t.get("priority", 0)
)
for t in new_tasks_data
]
return completed + new_tasks
def _synthesize(self, goal: str) -> str:
"""完了したタスクの結果を統合して最終出力を生成"""
response = self.client.messages.create(
model=self.model,
max_tokens=4096,
messages=[{"role": "user", "content": f"""
目標: {goal}
完了したタスクとその結果:
{json.dumps(self.completed_work, ensure_ascii=False, indent=2)}
振り返りからの学び:
{json.dumps([r.get('learning', '') for r in self.reflections], ensure_ascii=False)}
上記の成果を統合して、目標に対する最終的な成果物を出力してください。
"""}]
)
return response.content[0].text
def _update_cost(self, usage):
"""コストを更新(Sonnet基準)"""
self.total_cost += (
usage.input_tokens * 3.0 / 1_000_000
+ usage.output_tokens * 15.0 / 1_000_000
)
def get_execution_summary(self) -> dict:
"""実行サマリーを取得"""
completed = sum(1 for t in self.plan if t.status == TaskStatus.COMPLETED)
failed = sum(1 for t in self.plan if t.status == TaskStatus.FAILED)
total_time = time.time() - self.start_time
return {
"agent_id": self.agent_id,
"total_tasks": len(self.plan),
"completed": completed,
"failed": failed,
"total_steps": len(self.traces),
"total_tokens": self.total_tokens,
"total_cost": f"${self.total_cost:.4f}",
"total_time": f"{total_time:.1f}s",
"success_rate": f"{completed / max(len(self.plan), 1) * 100:.1f}%"
}3. 目標分解パターン
3.1 階層的目標分解
目標の階層分解
[最上位目標]
├── [サブゴール 1]
│ ├── [タスク 1.1]
│ ├── [タスク 1.2]
│ └── [タスク 1.3]
├── [サブゴール 2]
│ ├── [タスク 2.1]
│ └── [タスク 2.2]
└── [サブゴール 3]
├── [タスク 3.1]
├── [タスク 3.2]
└── [タスク 3.3]
例: "ECサイトを構築する"
├── "DB設計をする"
│ ├── ER図を作成
│ ├── テーブル定義
│ └── マイグレーション実行
├── "APIを実装する"
│ ├── 認証API
│ └── 商品API
└── "フロントエンドを構築する"
├── 商品一覧ページ
├── カートページ
└── 決済ページ
3.2 HTA(Hierarchical Task Analysis)による分解
# 階層的タスク分析に基づく目標分解
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class HTANode:
"""HTAツリーのノード"""
id: str
description: str
children: list["HTANode"] = field(default_factory=list)
plan: str = "" # このノードの実行計画
is_leaf: bool = False # 末端タスクか
@property
def depth(self) -> int:
if not self.children:
return 0
return 1 + max(c.depth for c in self.children)
def flatten(self) -> list["HTANode"]:
"""末端タスクをフラット化"""
if self.is_leaf or not self.children:
return [self]
result = []
for child in self.children:
result.extend(child.flatten())
return result
class HTAPlanner:
"""階層的タスク分析による計画生成"""
def __init__(self, client: anthropic.Anthropic):
self.client = client
def decompose(self, goal: str, max_depth: int = 3) -> HTANode:
"""目標を階層的に分解"""
root = HTANode(id="0", description=goal)
self._decompose_recursive(root, depth=0, max_depth=max_depth)
return root
def _decompose_recursive(
self, node: HTANode, depth: int, max_depth: int
):
"""再帰的に分解"""
if depth >= max_depth:
node.is_leaf = True
return
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1500,
messages=[{"role": "user", "content": f"""
タスク: {node.description}
このタスクを2-5個のサブタスクに分解してください。
各サブタスクは具体的で実行可能なものにしてください。
JSON配列で出力:
[
{{"id": "1", "description": "...", "is_leaf": true/false}},
...
]
is_leaf=true: これ以上分解不要な具体的アクション
is_leaf=false: さらに分解可能な抽象的タスク
"""}]
)
children_data = json.loads(response.content[0].text)
for child_data in children_data:
child = HTANode(
id=f"{node.id}.{child_data['id']}",
description=child_data["description"],
is_leaf=child_data.get("is_leaf", False)
)
node.children.append(child)
if not child.is_leaf:
self._decompose_recursive(child, depth + 1, max_depth)
def to_subtasks(self, root: HTANode) -> list[SubTask]:
"""HTAツリーをSubTaskリストに変換"""
leaves = root.flatten()
return [
SubTask(
id=i + 1,
description=leaf.description,
priority=i
)
for i, leaf in enumerate(leaves)
]
# 使用例
planner = HTAPlanner(anthropic.Anthropic())
tree = planner.decompose("ECサイトの決済機能を実装する", max_depth=2)
tasks = planner.to_subtasks(tree)3.3 適応的再計画
# 適応的再計画の実装
class AdaptivePlanner:
"""失敗と学びから計画を適応的に更新"""
def __init__(self, client: anthropic.Anthropic):
self.client = client
self.plan_history: list[list[SubTask]] = []
self.failure_patterns: list[dict] = []
def replan(
self,
goal: str,
current_state: dict,
failure_reason: str
) -> list[SubTask]:
"""失敗理由を考慮して計画を再作成"""
# 失敗パターンを記録
self.failure_patterns.append({
"reason": failure_reason,
"failed_tasks": current_state.get("failed", []),
"timestamp": time.time()
})
# 過去の失敗パターンから学ぶ
failure_summary = "\n".join(
f"- {p['reason']}" for p in self.failure_patterns[-5:]
)
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
messages=[{"role": "user", "content": f"""
目標: {goal}
現在の状態:
- 完了済み: {current_state['completed']}
- 失敗: {current_state['failed']}
- 失敗理由: {failure_reason}
過去の失敗パターン:
{failure_summary}
振り返りからの学び:
{current_state.get('reflections', [])}
以下の原則で再計画してください:
1. 過去に失敗したアプローチは避ける
2. 代替手段を積極的に検討する
3. 各タスクをより小さく具体的にする
4. リスクの高いタスクには前段階の確認を入れる
JSON配列で出力:
[{{"id": N, "description": "...", "dependencies": [], "priority": 0}}]
"""}]
)
tasks_data = json.loads(response.content[0].text)
new_plan = [
SubTask(
id=t["id"],
description=t["description"],
dependencies=t.get("dependencies", []),
priority=t.get("priority", 0)
)
for t in tasks_data
]
self.plan_history.append(new_plan)
return new_plan
def get_plan_evolution(self) -> str:
"""計画の変遷を取得"""
lines = []
for i, plan in enumerate(self.plan_history):
lines.append(f"=== 計画 v{i+1} ({len(plan)}タスク) ===")
for t in plan:
lines.append(f" [{t.id}] {t.description}")
return "\n".join(lines)4. メモリシステム
4.1 メモリの3層構造
自律エージェントのメモリアーキテクチャ| ワーキングメモリ (短期) |
|---|
| ・現在のタスクのコンテキスト |
| ・直近の会話履歴 |
| ・LLMのコンテキストウィンドウ内 |
| ・容量: 数千〜数万トークン |
| エピソード記憶 (中期) |
| ・過去の成功/失敗の記録 |
| ・学んだ教訓のリスト |
| ・タスク間で引き継ぐ情報 |
| ・容量: JSON/テキストファイル |
| セマンティック記憶 (長期) |
| ・ベクトルDBに格納 |
| ・類似経験の検索に使用 |
| ・ドメイン知識の蓄積 |
| ・容量: 数千〜数百万エントリ |
4.2 メモリシステムの実装
# 3層メモリシステム
from dataclasses import dataclass, field
from typing import Optional
import json
import time
import hashlib
@dataclass
class MemoryEntry:
"""メモリエントリ"""
content: str
type: str # "success", "failure", "learning", "fact"
tags: list[str] = field(default_factory=list)
importance: float = 0.5 # 0.0-1.0
timestamp: float = field(default_factory=time.time)
access_count: int = 0
class WorkingMemory:
"""ワーキングメモリ(短期)"""
def __init__(self, max_items: int = 20):
self.items: list[dict] = []
self.max_items = max_items
def add(self, content: str, type: str = "observation"):
self.items.append({
"content": content,
"type": type,
"timestamp": time.time()
})
# 容量超過時は古いものから削除
if len(self.items) > self.max_items:
self.items = self.items[-self.max_items:]
def get_context(self, max_tokens: int = 2000) -> str:
"""LLMに渡すコンテキストを生成"""
context_parts = []
total_chars = 0
for item in reversed(self.items):
text = f"[{item['type']}] {item['content']}"
if total_chars + len(text) > max_tokens * 4:
break
context_parts.insert(0, text)
total_chars += len(text)
return "\n".join(context_parts)
def clear(self):
self.items = []
class EpisodicMemory:
"""エピソード記憶(中期)"""
def __init__(self, filepath: str = "episodic_memory.json"):
self.filepath = filepath
self.episodes: list[MemoryEntry] = []
self._load()
def _load(self):
try:
with open(self.filepath, "r") as f:
data = json.load(f)
self.episodes = [
MemoryEntry(**ep) for ep in data
]
except FileNotFoundError:
self.episodes = []
def _save(self):
with open(self.filepath, "w") as f:
json.dump(
[
{
"content": ep.content,
"type": ep.type,
"tags": ep.tags,
"importance": ep.importance,
"timestamp": ep.timestamp,
"access_count": ep.access_count
}
for ep in self.episodes
],
f, ensure_ascii=False, indent=2
)
def record(self, content: str, type: str,
tags: list[str] = None, importance: float = 0.5):
"""エピソードを記録"""
entry = MemoryEntry(
content=content,
type=type,
tags=tags or [],
importance=importance
)
self.episodes.append(entry)
self._save()
def recall(self, query: str, top_k: int = 5) -> list[MemoryEntry]:
"""関連するエピソードを検索"""
# 簡易的なキーワードマッチング
query_words = set(query.lower().split())
scored = []
for ep in self.episodes:
content_words = set(ep.content.lower().split())
tag_words = set(w.lower() for w in ep.tags)
overlap = len(query_words & (content_words | tag_words))
score = overlap * ep.importance * (1 + ep.access_count * 0.1)
scored.append((score, ep))
scored.sort(key=lambda x: x[0], reverse=True)
results = [ep for _, ep in scored[:top_k] if _ > 0]
# アクセスカウント更新
for ep in results:
ep.access_count += 1
self._save()
return results
def get_learnings(self) -> list[str]:
"""学びのリストを取得"""
return [
ep.content for ep in self.episodes
if ep.type == "learning"
]
class SemanticMemory:
"""セマンティック記憶(長期)- ベクトルDB連携"""
def __init__(self, collection_name: str = "agent_memory"):
# ChromaDB等のベクトルDBを使用
try:
import chromadb
self.client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
except ImportError:
logger.warning("chromadb未インストール: セマンティック記憶は無効")
self.collection = None
def store(self, content: str, metadata: dict = None):
"""知識を格納"""
if not self.collection:
return
doc_id = hashlib.md5(content.encode()).hexdigest()
self.collection.upsert(
documents=[content],
metadatas=[metadata or {}],
ids=[doc_id]
)
def search(self, query: str, top_k: int = 5) -> list[dict]:
"""類似知識を検索"""
if not self.collection:
return []
results = self.collection.query(
query_texts=[query],
n_results=top_k
)
return [
{
"content": doc,
"metadata": meta,
"distance": dist
}
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
)
]
class AgentMemorySystem:
"""統合メモリシステム"""
def __init__(self):
self.working = WorkingMemory()
self.episodic = EpisodicMemory()
self.semantic = SemanticMemory()
def remember(self, content: str, type: str, importance: float = 0.5):
"""記憶を統一的に格納"""
self.working.add(content, type)
self.episodic.record(content, type, importance=importance)
if importance > 0.7:
self.semantic.store(content, {"type": type})
def recall_relevant(self, query: str) -> dict:
"""関連する記憶を全層から取得"""
return {
"working": self.working.get_context(),
"episodic": [
ep.content for ep in self.episodic.recall(query)
],
"semantic": [
r["content"] for r in self.semantic.search(query)
]
}4.3 メモリ統合エージェント
# メモリ付き自律エージェント
class MemoryAugmentedAgent(AutonomousAgent):
"""メモリシステムを統合した自律エージェント"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.memory = AgentMemorySystem()
def _execute_task(self, task: SubTask) -> str:
# 関連記憶を検索
memories = self.memory.recall_relevant(task.description)
# メモリをコンテキストに追加
memory_context = ""
if memories["episodic"]:
memory_context += "\n関連する過去の経験:\n"
memory_context += "\n".join(f"- {m}" for m in memories["episodic"][:3])
if memories["semantic"]:
memory_context += "\n関連する知識:\n"
memory_context += "\n".join(f"- {m}" for m in memories["semantic"][:3])
messages = [{"role": "user", "content": f"""
サブタスク: {task.description}
{memory_context}
これまでの成果:
{json.dumps(self.completed_work[-3:], ensure_ascii=False)}
このサブタスクを完了してください。
"""}]
# 実行(親クラスのロジックと同様)
result = self._execute_with_messages(messages)
# 結果をメモリに記録
self.memory.remember(
f"タスク'{task.description}'を実行。結果: {result[:200]}",
type="success" if task.status == TaskStatus.COMPLETED else "failure",
importance=0.6
)
return result
def _reflect(self, goal: str, task: SubTask, result: str) -> dict:
reflection = super()._reflect(goal, task, result)
# 学びをメモリに記録
if reflection.get("learning"):
self.memory.remember(
reflection["learning"],
type="learning",
importance=0.8
)
return reflection5. 自己評価メカニズム
5.1 多角的自己評価
# 多角的な自己評価
class SelfEvaluator:
"""エージェントの出力を多角的に評価"""
def __init__(self, client: anthropic.Anthropic):
self.client = client
def evaluate(self, goal: str, output: str) -> dict:
"""出力を多角的に評価"""
# 観点1: 目標達成度
completeness_resp = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=100,
messages=[{"role": "user", "content": f"""
目標: {goal}
出力: {output[:2000]}
目標の達成度を0-100の数値のみで回答:"""}]
)
completeness = int(completeness_resp.content[0].text.strip())
# 観点2: 品質
quality_resp = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=100,
messages=[{"role": "user", "content": f"""
出力: {output[:2000]}
品質(正確性、完全性、明確性)を0-100の数値のみで回答:"""}]
)
quality = int(quality_resp.content[0].text.strip())
# 観点3: 改善余地
improvements_resp = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=500,
messages=[{"role": "user", "content": f"""
出力: {output[:2000]}
改善可能な点を3つ、箇条書きで挙げてください:"""}]
)
improvements = improvements_resp.content[0].text
return {
"completeness": completeness,
"quality": quality,
"improvements": improvements,
"should_improve": completeness < 80 or quality < 70,
"overall_score": (completeness + quality) / 2
}
def evaluate_with_rubric(self, output: str, rubric: dict) -> dict:
"""ルーブリックに基づく評価"""
results = {}
for criterion, description in rubric.items():
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=200,
messages=[{"role": "user", "content": f"""
出力:
{output[:1500]}
評価基準「{criterion}」: {description}
この基準に対するスコア(0-10)と根拠を以下の形式で:
スコア: N
根拠: ...
"""}]
)
text = response.content[0].text
score_line = text.split("\n")[0]
score = int(score_line.split(":")[-1].strip())
results[criterion] = {
"score": score,
"feedback": text
}
return results5.2 自己評価フロー
自己評価のフロー
[出力] → [完全性チェック] → 80%未満 → [再実行]
|
v 80%以上
[品質チェック] → 70点未満 → [改善ループ]
|
v 70点以上
[最終確認] → 承認 → [完了]
5.3 Reflexionパターンの実装
# Reflexion: 言語的自己強化学習
class ReflexionAgent(AutonomousAgent):
"""Reflexionパターンを組み込んだ自律エージェント"""
def __init__(self, *args, max_reflexion_rounds: int = 3, **kwargs):
super().__init__(*args, **kwargs)
self.max_reflexion_rounds = max_reflexion_rounds
self.reflexion_memory: list[str] = [] # 言語的な経験メモリ
def run(self, goal: str) -> str:
"""Reflexionループで実行"""
best_result = None
best_score = 0
for round_num in range(self.max_reflexion_rounds):
logger.info(f"Reflexion Round {round_num + 1}")
# 実行
result = self._execute_round(goal, round_num)
# 評価
evaluator = SelfEvaluator(self.client)
evaluation = evaluator.evaluate(goal, result)
score = evaluation["overall_score"]
logger.info(
f" スコア: {score:.1f} "
f"(完全性: {evaluation['completeness']}, "
f"品質: {evaluation['quality']})"
)
if score > best_score:
best_score = score
best_result = result
# 基準を満たしたら終了
if not evaluation["should_improve"]:
logger.info(" 品質基準達成")
break
# 振り返り(Reflexion)
reflexion = self._generate_reflexion(
goal, result, evaluation
)
self.reflexion_memory.append(reflexion)
logger.info(f" Reflexion: {reflexion[:100]}...")
# 計画をリセットして再試行
self.plan = []
self.completed_work = []
return best_result
def _execute_round(self, goal: str, round_num: int) -> str:
"""1ラウンドの実行"""
# 過去のReflexionをコンテキストに含める
reflexion_context = ""
if self.reflexion_memory:
reflexion_context = "\n過去の振り返り(同じ失敗を避けること):\n"
for i, r in enumerate(self.reflexion_memory):
reflexion_context += f"Round {i+1}: {r}\n"
augmented_goal = f"{goal}\n{reflexion_context}"
self.plan = self._create_plan(augmented_goal)
for step in range(self.max_steps):
if self._should_stop():
break
next_task = self._select_next_task()
if next_task is None:
break
self._execute_task(next_task)
return self._synthesize(goal)
def _generate_reflexion(
self, goal: str, result: str, evaluation: dict
) -> str:
"""失敗からの振り返りを生成"""
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=500,
messages=[{"role": "user", "content": f"""
目標: {goal}
実行結果: {result[:1000]}
評価:
- 完全性: {evaluation['completeness']}%
- 品質: {evaluation['quality']}点
- 改善点: {evaluation['improvements']}
なぜこの結果が不十分だったのか、
次回どうすれば改善できるか、
具体的で実行可能なアドバイスを2-3文で:
"""}]
)
return response.content[0].text6. ヒューマン・イン・ザ・ループ
6.1 介入ポイントの設計
ヒューマン・イン・ザ・ループの介入ポイント
[計画] ──確認──> [人間の承認] ──→ [実行] ──→ [振り返り]
|
重要判断 ──確認──> [人間の判断]
|
破壊的操作 ──確認──> [人間の承認]
6.2 実装パターン
# ヒューマン・イン・ザ・ループの実装
from enum import Enum
class ApprovalLevel(Enum):
NONE = "none" # 承認不要
INFO = "info" # 通知のみ
OPTIONAL = "optional" # 任意承認(タイムアウトで自動承認)
REQUIRED = "required" # 必須承認
BLOCKING = "blocking" # ブロッキング承認(絶対に人間の判断が必要)
class HumanInTheLoopAgent(AutonomousAgent):
"""ヒューマン・イン・ザ・ループ付き自律エージェント"""
def __init__(
self,
*args,
approval_rules: dict[str, ApprovalLevel] = None,
approval_timeout: float = 300, # 5分
notification_callback=None,
**kwargs
):
super().__init__(*args, **kwargs)
self.approval_rules = approval_rules or {
"delete": ApprovalLevel.REQUIRED,
"deploy": ApprovalLevel.BLOCKING,
"send": ApprovalLevel.REQUIRED,
"purchase": ApprovalLevel.BLOCKING,
"modify_production": ApprovalLevel.BLOCKING,
"install": ApprovalLevel.OPTIONAL,
"create": ApprovalLevel.INFO,
}
self.approval_timeout = approval_timeout
self.notification_callback = notification_callback
self.approval_log: list[dict] = []
def _get_approval_level(self, task: SubTask) -> ApprovalLevel:
"""タスクの承認レベルを判定"""
for keyword, level in self.approval_rules.items():
if keyword in task.description.lower():
return level
return ApprovalLevel.NONE
def _request_approval(
self, task: SubTask, level: ApprovalLevel
) -> tuple[bool, str]:
"""人間の承認を要求"""
log_entry = {
"task": task.description,
"level": level.value,
"timestamp": time.time(),
"decision": None
}
if level == ApprovalLevel.INFO:
# 通知のみ
if self.notification_callback:
self.notification_callback(
f"[INFO] タスク実行中: {task.description}"
)
log_entry["decision"] = "auto_approved"
self.approval_log.append(log_entry)
return True, ""
if level == ApprovalLevel.OPTIONAL:
# タイムアウト付き承認
print(f"\n[承認要求(任意)] タスク: {task.description}")
print(f" {self.approval_timeout}秒以内に入力がない場合、自動承認されます")
# 実際のUIではWebSocket/Slack等で通知
try:
import signal
signal.alarm(int(self.approval_timeout))
approval = input("承認しますか? (yes/no): ").strip().lower()
signal.alarm(0)
except Exception:
approval = "yes" # タイムアウト → 自動承認
approved = approval != "no"
log_entry["decision"] = "approved" if approved else "rejected"
self.approval_log.append(log_entry)
return approved, ""
# REQUIRED / BLOCKING
print(f"\n{'='*50}")
print(f"[承認必須] タスク: {task.description}")
print(f"承認レベル: {level.value}")
print(f"{'='*50}")
approval = input("承認しますか? (yes/no/modify): ").strip().lower()
if approval == "no":
log_entry["decision"] = "rejected"
self.approval_log.append(log_entry)
return False, "ユーザーにより拒否されました"
elif approval == "modify":
new_desc = input("修正後のタスク内容: ")
task.description = new_desc
log_entry["decision"] = "modified"
log_entry["modified_to"] = new_desc
self.approval_log.append(log_entry)
return True, ""
else:
log_entry["decision"] = "approved"
self.approval_log.append(log_entry)
return True, ""
def _execute_task(self, task: SubTask) -> str:
level = self._get_approval_level(task)
if level != ApprovalLevel.NONE:
approved, message = self._request_approval(task, level)
if not approved:
task.status = TaskStatus.BLOCKED
return message
return super()._execute_task(task)6.3 非同期承認(Slack/Web統合)
# Slack経由の承認フロー
import asyncio
from typing import Callable, Awaitable
class AsyncApprovalSystem:
"""非同期承認システム"""
def __init__(self):
self.pending_approvals: dict[str, asyncio.Future] = {}
async def request_approval(
self,
approval_id: str,
task_description: str,
timeout: float = 300
) -> bool:
"""承認を非同期で要求"""
future = asyncio.get_event_loop().create_future()
self.pending_approvals[approval_id] = future
# Slackに通知を送信
await self._send_slack_notification(
f"承認要求: {task_description}\n"
f"承認: `/approve {approval_id}`\n"
f"拒否: `/reject {approval_id}`"
)
try:
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
logger.warning(f"承認タイムアウト: {approval_id}")
return False
finally:
self.pending_approvals.pop(approval_id, None)
def handle_approval_response(self, approval_id: str, approved: bool):
"""承認応答を処理(Slackコマンドから呼ばれる)"""
future = self.pending_approvals.get(approval_id)
if future and not future.done():
future.set_result(approved)
async def _send_slack_notification(self, message: str):
"""Slack通知を送信"""
# Slack API連携の実装
logger.info(f"Slack通知: {message}")7. 安全性ガードレール
7.1 多層防御
# 自律エージェントのガードレール
from dataclasses import dataclass
from typing import Callable
@dataclass
class GuardRail:
"""ガードレール定義"""
name: str
check: Callable[[dict], bool] # True=安全, False=危険
action: str # "block", "warn", "require_approval"
description: str
class GuardedAutonomousAgent(AutonomousAgent):
"""ガードレール付き自律エージェント"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.guardrails: list[GuardRail] = self._default_guardrails()
self.violations: list[dict] = []
def _default_guardrails(self) -> list[GuardRail]:
return [
GuardRail(
name="step_limit",
check=lambda ctx: ctx["step"] < self.max_steps,
action="block",
description="最大ステップ数の制限"
),
GuardRail(
name="cost_limit",
check=lambda ctx: ctx["cost"] < self.max_cost,
action="block",
description="コスト上限の制限"
),
GuardRail(
name="timeout",
check=lambda ctx: ctx["elapsed"] < self.timeout,
action="block",
description="実行時間の制限"
),
GuardRail(
name="forbidden_commands",
check=lambda ctx: not any(
cmd in ctx.get("action", "").lower()
for cmd in ["rm -rf", "drop table", "format", "shutdown"]
),
action="block",
description="危険なコマンドの禁止"
),
GuardRail(
name="pii_filter",
check=lambda ctx: not self._contains_pii(ctx.get("output", "")),
action="warn",
description="個人情報の出力防止"
),
GuardRail(
name="loop_detection",
check=lambda ctx: not self._detect_wandering(),
action="require_approval",
description="無限ループの検出"
),
GuardRail(
name="scope_check",
check=lambda ctx: self._is_within_scope(
ctx.get("action", ""), ctx.get("goal", "")
),
action="warn",
description="目標の範囲内かチェック"
),
]
def _contains_pii(self, text: str) -> bool:
"""個人情報を含むかチェック"""
import re
patterns = [
r'\b\d{3}-\d{4}-\d{4}\b', # 電話番号
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # メール
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', # クレジットカード
]
return any(re.search(p, text) for p in patterns)
def _is_within_scope(self, action: str, goal: str) -> bool:
"""アクションが目標の範囲内かチェック"""
# 簡易的なスコープチェック
# 本番ではLLMによる判断を推奨
return True
def check_guardrails(self, context: dict) -> list[dict]:
"""全ガードレールをチェック"""
violations = []
for rail in self.guardrails:
try:
if not rail.check(context):
violation = {
"guardrail": rail.name,
"action": rail.action,
"description": rail.description,
"timestamp": time.time()
}
violations.append(violation)
self.violations.append(violation)
logger.warning(
f"ガードレール違反: {rail.name} - {rail.description}"
)
except Exception as e:
logger.error(f"ガードレールチェックエラー: {rail.name} - {e}")
return violations
def _execute_task(self, task: SubTask) -> str:
context = {
"step": len(self.traces),
"cost": self.total_cost,
"elapsed": time.time() - self.start_time,
"action": task.description,
"goal": "",
}
violations = self.check_guardrails(context)
for v in violations:
if v["action"] == "block":
task.status = TaskStatus.BLOCKED
return f"ガードレールにより停止: {v['description']}"
elif v["action"] == "require_approval":
print(f"\n[ガードレール警告] {v['description']}")
approval = input("続行しますか? (yes/no): ")
if approval != "yes":
task.status = TaskStatus.BLOCKED
return "ユーザーにより停止"
return super()._execute_task(task)7.2 迷走検出の詳細実装
# 高度な迷走検出
class WanderingDetector:
"""エージェントの迷走を検出するシステム"""
def __init__(self, window_size: int = 10):
self.window_size = window_size
self.action_history: list[dict] = []
def record_action(self, action: str, result: str, success: bool):
self.action_history.append({
"action": action,
"result_hash": hashlib.md5(result.encode()).hexdigest()[:8],
"success": success,
"timestamp": time.time()
})
def is_wandering(self) -> tuple[bool, str]:
"""迷走を検出し、理由を返す"""
if len(self.action_history) < self.window_size:
return False, ""
recent = self.action_history[-self.window_size:]
# パターン1: 同一アクションの繰り返し
actions = [a["action"] for a in recent]
most_common = max(set(actions), key=actions.count)
if actions.count(most_common) > self.window_size * 0.7:
return True, f"同一アクションの繰り返し: {most_common}"
# パターン2: 同一結果の繰り返し
results = [a["result_hash"] for a in recent]
if len(set(results)) < 3:
return True, "同一結果の繰り返し"
# パターン3: 連続失敗
failures = sum(1 for a in recent if not a["success"])
if failures > self.window_size * 0.8:
return True, f"連続失敗: {failures}/{self.window_size}"
# パターン4: 作成→削除の繰り返し
create_delete_pairs = 0
for i in range(len(recent) - 1):
if ("create" in recent[i]["action"].lower() and
"delete" in recent[i+1]["action"].lower()):
create_delete_pairs += 1
if create_delete_pairs >= 3:
return True, "作成→削除の繰り返し(矛盾する行動)"
return False, ""8. 自律性レベル比較
| レベル | 説明 | 人間の関与 | 適用場面 | リスク |
|---|---|---|---|---|
| L0 手動 | 全て手動 | 100% | - | 最低 |
| L1 アシスト | 1ステップ実行 | 80% | IDE補完 | 低 |
| L2 半自律 | 複数ステップ | 50% | チャットボット | 低-中 |
| L3 条件付き | 重要判断のみ | 10-20% | コーディングエージェント | 中 |
| L4 完全自律 | 目標のみ指定 | 0-5% | 自動デプロイ | 高 |
代表的プロダクトの自律性レベル
| プロダクト | レベル | 特徴 |
|---|---|---|
| GitHub Copilot | L1 | 行単位の補完 |
| ChatGPT | L1-L2 | ツール付き対話 |
| Claude Code | L3 | コーディング+ファイル操作 |
| Devin | L3-L4 | ソフトウェア開発の自律実行 |
| AutoGPT | L4 | 完全自律(実用性は課題) |
9. モニタリングとコスト管理
9.1 実行モニタリング
# 自律エージェントのモニタリング
from dataclasses import dataclass, field
from collections import defaultdict
import statistics
@dataclass
class AgentMonitor:
"""自律エージェントのモニタリングシステム"""
metrics: dict[str, list[float]] = field(
default_factory=lambda: defaultdict(list)
)
alerts: list[dict] = field(default_factory=list)
alert_thresholds: dict[str, float] = field(default_factory=lambda: {
"step_duration_p95": 30.0, # 秒
"cost_per_step": 0.05, # USD
"failure_rate": 0.3, # 30%
"wandering_score": 0.7, # 迷走スコア
})
def record_step(
self,
step: int,
duration: float,
cost: float,
success: bool,
tokens: int
):
"""ステップのメトリクスを記録"""
self.metrics["duration"].append(duration)
self.metrics["cost"].append(cost)
self.metrics["success"].append(1.0 if success else 0.0)
self.metrics["tokens"].append(tokens)
# アラートチェック
self._check_alerts(step)
def _check_alerts(self, step: int):
"""アラート条件をチェック"""
durations = self.metrics["duration"]
if len(durations) >= 5:
p95 = sorted(durations)[int(len(durations) * 0.95)]
if p95 > self.alert_thresholds["step_duration_p95"]:
self._add_alert("高レイテンシ", f"P95={p95:.1f}s", step)
costs = self.metrics["cost"]
if costs and costs[-1] > self.alert_thresholds["cost_per_step"]:
self._add_alert(
"高コストステップ",
f"${costs[-1]:.4f}",
step
)
successes = self.metrics["success"]
if len(successes) >= 5:
recent_rate = statistics.mean(successes[-5:])
if recent_rate < (1 - self.alert_thresholds["failure_rate"]):
self._add_alert(
"高失敗率",
f"直近5ステップ成功率={recent_rate*100:.0f}%",
step
)
def _add_alert(self, type: str, detail: str, step: int):
alert = {
"type": type,
"detail": detail,
"step": step,
"timestamp": time.time()
}
self.alerts.append(alert)
logger.warning(f"ALERT [{type}]: {detail} (step {step})")
def get_report(self) -> str:
"""モニタリングレポートを生成"""
if not self.metrics["duration"]:
return "データなし"
lines = [
"=== 自律エージェント実行レポート ===",
f"総ステップ数: {len(self.metrics['duration'])}",
f"総実行時間: {sum(self.metrics['duration']):.1f}s",
f"総コスト: ${sum(self.metrics['cost']):.4f}",
f"総トークン: {sum(self.metrics['tokens']):,}",
f"成功率: {statistics.mean(self.metrics['success'])*100:.1f}%",
f"",
f"--- ステップ別統計 ---",
f"実行時間: 平均={statistics.mean(self.metrics['duration']):.2f}s, "
f"最大={max(self.metrics['duration']):.2f}s",
f"コスト: 平均=${statistics.mean(self.metrics['cost']):.4f}, "
f"最大=${max(self.metrics['cost']):.4f}",
]
if self.alerts:
lines.append(f"\n--- アラート ({len(self.alerts)}件) ---")
for a in self.alerts[-10:]:
lines.append(f" [{a['type']}] {a['detail']} (step {a['step']})")
return "\n".join(lines)9.2 コスト最適化
# コスト最適化戦略
class CostOptimizer:
"""自律エージェントのコスト最適化"""
MODEL_TIERS = {
"fast": {
"model": "claude-haiku-3-20240307",
"input_cost": 0.25,
"output_cost": 1.25,
},
"balanced": {
"model": "claude-sonnet-4-20250514",
"input_cost": 3.0,
"output_cost": 15.0,
},
"best": {
"model": "claude-opus-4-20250514",
"input_cost": 15.0,
"output_cost": 75.0,
}
}
@staticmethod
def select_model_for_phase(phase: str) -> str:
"""フェーズに応じたモデルを選択"""
phase_model_map = {
"planning": "balanced", # 計画は中品質で十分
"execution": "balanced", # 実行は中品質
"reflection": "fast", # 振り返りは高速で十分
"evaluation": "balanced", # 評価は中品質
"synthesis": "best", # 最終統合は高品質
"classification": "fast", # 分類は高速で十分
}
tier = phase_model_map.get(phase, "balanced")
return CostOptimizer.MODEL_TIERS[tier]["model"]
@staticmethod
def estimate_cost(
plan: list[SubTask],
avg_tokens_per_task: int = 2000
) -> dict:
"""実行前にコストを見積もる"""
# 各フェーズの推定コスト
planning_cost = avg_tokens_per_task * 2 * 3.0 / 1_000_000 # 計画
execution_cost = (
len(plan) * avg_tokens_per_task * 2 * 3.0 / 1_000_000
)
reflection_cost = (
len(plan) * avg_tokens_per_task * 0.25 / 1_000_000 # Haiku
)
synthesis_cost = avg_tokens_per_task * 3 * 15.0 / 1_000_000 # Opus
total = planning_cost + execution_cost + reflection_cost + synthesis_cost
return {
"planning": f"${planning_cost:.4f}",
"execution": f"${execution_cost:.4f}",
"reflection": f"${reflection_cost:.4f}",
"synthesis": f"${synthesis_cost:.4f}",
"total_estimate": f"${total:.4f}",
"total_with_buffer": f"${total * 1.5:.4f}", # 50%バッファ
}10. テスト
10.1 単体テスト
# 自律エージェントのテスト
import pytest
from unittest.mock import patch, MagicMock, AsyncMock
class TestAutonomousAgent:
"""自律エージェントの単体テスト"""
@pytest.fixture
def mock_client(self):
with patch("anthropic.Anthropic") as mock:
yield mock.return_value
@pytest.fixture
def agent(self, mock_client):
return AutonomousAgent(
tools=[],
max_steps=10,
max_cost=1.0,
timeout=60
)
def test_create_plan(self, agent, mock_client):
"""計画生成のテスト"""
mock_response = MagicMock()
mock_response.content = [MagicMock(
text='[{"id": 1, "description": "タスク1"}, '
'{"id": 2, "description": "タスク2"}]'
)]
mock_response.usage = MagicMock(input_tokens=100, output_tokens=50)
mock_client.messages.create.return_value = mock_response
plan = agent._create_plan("テスト目標")
assert len(plan) == 2
assert plan[0].description == "タスク1"
def test_select_next_task_respects_dependencies(self, agent):
"""依存関係を考慮したタスク選択"""
agent.plan = [
SubTask(id=1, description="タスク1", status=TaskStatus.COMPLETED),
SubTask(id=2, description="タスク2", dependencies=[1]),
SubTask(id=3, description="タスク3", dependencies=[2]),
]
next_task = agent._select_next_task()
assert next_task.id == 2 # 依存先が完了しているタスク2
def test_select_next_task_blocks_unmet_deps(self, agent):
"""未完了の依存先がある場合はスキップ"""
agent.plan = [
SubTask(id=1, description="タスク1", status=TaskStatus.PENDING),
SubTask(id=2, description="タスク2", dependencies=[1]),
]
next_task = agent._select_next_task()
assert next_task.id == 1 # 依存なしのタスク1のみ選択可能
def test_should_stop_cost_limit(self, agent):
"""コスト上限での停止"""
agent.total_cost = 2.0 # max_cost=1.0を超過
agent.start_time = time.time()
assert agent._should_stop() is True
def test_should_stop_timeout(self, agent):
"""タイムアウトでの停止"""
agent.start_time = time.time() - 120 # timeout=60を超過
assert agent._should_stop() is True
class TestWanderingDetector:
"""迷走検出のテスト"""
def test_detect_repeated_action(self):
detector = WanderingDetector(window_size=5)
for _ in range(5):
detector.record_action("search", "result1", True)
is_wandering, reason = detector.is_wandering()
assert is_wandering
assert "同一アクション" in reason
def test_detect_consecutive_failures(self):
detector = WanderingDetector(window_size=5)
for _ in range(5):
detector.record_action("different_action", f"result_{_}", False)
is_wandering, reason = detector.is_wandering()
assert is_wandering
assert "連続失敗" in reason
def test_no_wandering_normal_operation(self):
detector = WanderingDetector(window_size=5)
for i in range(5):
detector.record_action(f"action_{i}", f"result_{i}", True)
is_wandering, _ = detector.is_wandering()
assert not is_wandering
class TestSelfEvaluator:
"""自己評価のテスト"""
@pytest.fixture
def mock_client(self):
with patch("anthropic.Anthropic") as mock:
yield mock.return_value
def test_evaluate_high_quality(self, mock_client):
evaluator = SelfEvaluator(mock_client)
# 高スコアの応答をモック
responses = [
MagicMock(content=[MagicMock(text="90")]), # completeness
MagicMock(content=[MagicMock(text="85")]), # quality
MagicMock(content=[MagicMock(text="改善点1\n改善点2\n改善点3")]),
]
mock_client.messages.create.side_effect = responses
result = evaluator.evaluate("目標", "出力テキスト")
assert result["completeness"] == 90
assert result["quality"] == 85
assert not result["should_improve"]
def test_evaluate_low_quality(self, mock_client):
evaluator = SelfEvaluator(mock_client)
responses = [
MagicMock(content=[MagicMock(text="50")]),
MagicMock(content=[MagicMock(text="40")]),
MagicMock(content=[MagicMock(text="改善必要")]),
]
mock_client.messages.create.side_effect = responses
result = evaluator.evaluate("目標", "出力テキスト")
assert result["should_improve"]
class TestGuardrails:
"""ガードレールのテスト"""
def test_pii_detection(self):
agent = GuardedAutonomousAgent(tools=[], max_steps=10)
# 電話番号
assert agent._contains_pii("080-1234-5678") is True
# メールアドレス
assert agent._contains_pii("user@example.com") is True
# 通常テキスト
assert agent._contains_pii("安全なテキスト") is False
def test_forbidden_commands(self):
agent = GuardedAutonomousAgent(tools=[], max_steps=10)
context = {
"step": 0,
"cost": 0,
"elapsed": 0,
"action": "rm -rf /important",
}
violations = agent.check_guardrails(context)
blocked = [v for v in violations if v["action"] == "block"]
assert len(blocked) > 010.2 統合テスト
# 統合テスト
class TestAutonomousAgentIntegration:
"""自律エージェントの統合テスト"""
@pytest.fixture
def mock_llm_sequence(self):
"""LLM呼び出しのシーケンスをモック"""
def create_response(text, stop="end_turn"):
resp = MagicMock()
resp.content = [MagicMock(text=text, type="text")]
resp.stop_reason = stop
resp.usage = MagicMock(input_tokens=100, output_tokens=50)
return resp
return create_response
def test_full_execution_flow(self, mock_llm_sequence):
"""完全な実行フローのテスト"""
with patch("anthropic.Anthropic") as mock_cls:
client = mock_cls.return_value
# 計画 → 実行 × 2 → 振り返り × 2 → 統合
client.messages.create.side_effect = [
# 計画
mock_llm_sequence(
'[{"id":1,"description":"分析"},{"id":2,"description":"レポート"}]'
),
# タスク1実行
mock_llm_sequence("分析結果"),
# タスク1振り返り
mock_llm_sequence(
'{"quality":"good","needs_replan":false,"reason":null,'
'"learning":"分析完了","confidence":0.9}'
),
# タスク2実行
mock_llm_sequence("レポート完成"),
# タスク2振り返り
mock_llm_sequence(
'{"quality":"good","needs_replan":false,"reason":null,'
'"learning":"レポート完了","confidence":0.95}'
),
# 統合
mock_llm_sequence("最終レポート"),
]
agent = AutonomousAgent(tools=[], max_steps=10)
result = agent.run("テスト分析を実行")
assert result == "最終レポート"
summary = agent.get_execution_summary()
assert summary["completed"] == 2
assert summary["failed"] == 011. アンチパターン
アンチパターン1: 振り返りなしの猪突猛進
# NG: 結果を評価せず次に進む
for task in plan:
result = execute(task)
# 結果が悪くてもそのまま続行...
# OK: 各ステップで振り返りと必要に応じた再計画
for task in plan:
result = execute(task)
evaluation = reflect(result)
if evaluation["quality"] == "poor":
plan = replan(goal, evaluation["reason"])アンチパターン2: 過度な自律性(ガードレールなし)
# NG: 制限なしの自律実行
agent.run("サーバーのパフォーマンスを最適化して")
# → 勝手に本番設定を変更、サービスダウン
# OK: 適切なガードレール
agent = GuardedAutonomousAgent(
tools=available_tools,
max_steps=30, # ステップ上限
max_cost=5.0, # コスト上限(ドル)
timeout=600, # タイムアウト(秒)
)アンチパターン3: メモリの不使用
# NG: 毎回ゼロから計画(過去の経験を活用しない)
agent = AutonomousAgent(tools=tools)
result = agent.run("バグを修正して")
# → 同じ失敗を何度も繰り返す
# OK: メモリを活用
agent = MemoryAugmentedAgent(tools=tools)
# 過去の成功/失敗パターンを自動的に活用
result = agent.run("バグを修正して")アンチパターン4: 単一モデルへの依存
# NG: 全フェーズで最高性能モデル(コスト爆発)
agent = AutonomousAgent(model="claude-opus-4-20250514")
# OK: フェーズごとのモデル最適化
class CostAwareAgent(AutonomousAgent):
def _create_plan(self, goal):
# 計画にはbalancedモデル
self.current_model = CostOptimizer.select_model_for_phase("planning")
return super()._create_plan(goal)
def _reflect(self, goal, task, result):
# 振り返りにはfastモデル
self.current_model = CostOptimizer.select_model_for_phase("reflection")
return super()._reflect(goal, task, result)
def _synthesize(self, goal):
# 最終統合にはbestモデル
self.current_model = CostOptimizer.select_model_for_phase("synthesis")
return super()._synthesize(goal)12. FAQ
Q1: 自律エージェントの実行時間の目安は?
タスクの複雑さに依存するが、現時点の目安:
- 単純タスク(ファイル操作、検索): 30秒〜2分
- 中程度(コーディング、分析): 2分〜10分
- 複雑(設計+実装+テスト): 10分〜1時間
- 大規模(プロジェクト全体): 1時間以上
長時間タスクは チェックポイント と 進捗通知 が必須。
Q2: 自律エージェントが「迷走」した場合の検出方法は?
- 同一ツールの連続呼び出し: 3回以上同じツールを同じ引数で呼び出し
- コスト急増: 予想コストの2倍を超えた時点
- 進捗の停滞: 最後の成功タスクから5ステップ以上経過
- 矛盾する行動: 作成したものをすぐ削除するなど
検出時は 自動的に一時停止し、ユーザーに判断を求める のが安全。
Q3: Reflexionパターンとは何か?
Reflexionは「実行→失敗→反省→再実行」のサイクルを明示的にモデル化したパターン。通常のリトライと異なり、失敗の原因を言語化してメモリに保存し、次の試行で同じ失敗を避ける 点が特徴。テスト駆動開発のような「赤→緑」のフィードバックループと相性が良い。
Q4: 自律エージェントの計画はどの程度詳細にすべきか?
初期計画は粗い粒度で作成し、各サブゴールの実行前に詳細化するのが推奨。理由:
- 全体像を早期に把握できる
- 実行中に得た情報で詳細計画を調整できる
- 過度に詳細な計画は変更コストが高い
Q5: メモリシステムはどこまで必要か?
自律性レベルにより異なる:
- L2(半自律): ワーキングメモリのみで十分
- L3(条件付き自律): ワーキングメモリ + エピソード記憶
- L4(完全自律): 3層全て(ワーキング + エピソード + セマンティック)
Q6: 複数の自律エージェントを協調させる場合は?
マルチエージェントパターン(01-multi-agent.md)と組み合わせる。典型的な構成:
- オーケストレータ(L3): 全体計画と進捗管理
- ワーカーエージェント(L2-L3): 個別タスクの実行
- レビューエージェント(L2): 成果物のチェック
FAQ
Q1: このトピックを学ぶ上で最も重要なポイントは何ですか?
実践的な経験を積むことが最も重要です。理論だけでなく、実際にコードを書いて動作を確認することで理解が深まります。
Q2: 初心者がよく陥る間違いは何ですか?
基礎を飛ばして応用に進むことです。このガイドで説明している基本概念をしっかり理解してから、次のステップに進むことをお勧めします。
Q3: 実務ではどのように活用されていますか?
このトピックの知識は、日常的な開発業務で頻繁に活用されます。特にコードレビューやアーキテクチャ設計の際に重要になります。
まとめ
| 項目 | 内容 |
|---|---|
| コアループ | 計画 → 実行 → 振り返り → (再計画) |
| 目標分解 | 階層的にサブゴール/タスクに分解(HTA) |
| 自己評価 | 完全性・品質・改善余地の多角評価 |
| Reflexion | 失敗を言語化し次の試行に活かす |
| メモリ | 短期(ワーキング)・中期(エピソード)・長期(セマンティック) |
| 再計画 | 失敗からの学習を次の計画に反映 |
| HITL | 重要判断ポイントで人間の承認を挟む |
| ガードレール | ステップ/コスト/時間の上限 + 禁止操作 + PII検出 |
| 迷走検出 | 繰り返し/連続失敗/矛盾行動のパターン検出 |
| コスト最適化 | フェーズ別モデル選択 + 事前見積り |
次に読むべきガイド
- ../02-implementation/03-claude-agent-sdk.md — Claude Agent SDKでの自律エージェント構築
- ../02-implementation/04-evaluation.md — エージェントの評価手法
- ../04-production/01-safety.md — 安全性とガードレール
参考文献
- Shinn, N. et al., "Reflexion: Language Agents with Verbal Reinforcement Learning" (2023) — https://arxiv.org/abs/2303.11366
- Yao, S. et al., "Tree of Thoughts: Deliberate Problem Solving with Large Language Models" (2023) — https://arxiv.org/abs/2305.10601
- Wang, G. et al., "Voyager: An Open-Ended Embodied Agent with Large Language Models" (2023) — https://arxiv.org/abs/2305.16291
- Park, J.S. et al., "Generative Agents: Interactive Simulacra of Human Behavior" (2023) — https://arxiv.org/abs/2304.03442
- Anthropic, "Building effective agents" (2024) — https://docs.anthropic.com/en/docs/build-with-claude/agentic