Skilore

AI エージェントの安全性 -- ガードレール・人間監視・制限

自律的に行動する AI エージェントが暴走せず、安全かつ制御可能に動作するための技術的ガードレール、人間によるオーバーサイト、権限制限の設計パターンを体系的に学ぶ。

89 分で読めます44,181 文字

AI エージェントの安全性 -- ガードレール・人間監視・制限

自律的に行動する AI エージェントが暴走せず、安全かつ制御可能に動作するための技術的ガードレール、人間によるオーバーサイト、権限制限の設計パターンを体系的に学ぶ。


この章で学ぶこと

  1. ガードレール設計 -- 入力検証、出力フィルタリング、アクション制限による多層防御の実装
  2. 人間監視 (Human-in-the-Loop) -- 承認ワークフロー、エスカレーション、介入メカニズムの設計
  3. 権限制限とサンドボックス -- 最小権限の原則、リソース制限、実行環境の隔離
  4. プロンプトインジェクション対策 -- 攻撃パターンの理解と防御手法の実装
  5. 監査とコンプライアンス -- 全操作の記録、追跡可能性の確保、規制対応

前提知識

このガイドを読む前に、以下の知識があると理解が深まります:

  • 基本的なプログラミングの知識
  • 関連する基礎概念の理解
  • デプロイ の内容を理解していること

1. エージェント安全性の全体像

1.1 安全性の多層防御モデル

+------------------------------------------------------------------+
|                    多層防御アーキテクチャ                           |
+------------------------------------------------------------------+
|                                                                    |
|  Layer 1: 入力ガード                                               |
|  +------------------------------------------------------------+  |
|  | プロンプトインジェクション検出 | 入力バリデーション | レート制限 |  |
|  +------------------------------------------------------------+  |
|                              |                                     |
|  Layer 2: エージェントコア                                         |
|  +------------------------------------------------------------+  |
|  | システムプロンプト | ツール権限制御 | コンテキスト制限          |  |
|  +------------------------------------------------------------+  |
|                              |                                     |
|  Layer 3: アクションガード                                         |
|  +------------------------------------------------------------+  |
|  | ツール呼び出し検証 | 破壊的操作の承認 | 実行前確認             |  |
|  +------------------------------------------------------------+  |
|                              |                                     |
|  Layer 4: 出力ガード                                               |
|  +------------------------------------------------------------+  |
|  | 有害性チェック | PII検出 | 品質検証 | 一貫性チェック            |  |
|  +------------------------------------------------------------+  |
|                              |                                     |
|  Layer 5: 実行環境                                                 |
|  +------------------------------------------------------------+  |
|  | サンドボックス | リソース制限 | ネットワーク制限 | タイムアウト  |  |
|  +------------------------------------------------------------+  |
|                                                                    |
+------------------------------------------------------------------+

1.2 エージェントのリスクマトリクス

影響度
  高 |  監視必須     承認必須     禁止
     |  (メール送信)  (課金操作)   (データ削除)
     |
  中 |  ログ記録     監視必須     承認必須
     |  (検索)       (ファイル作成) (外部API)
     |
  低 |  制限なし     ログ記録     監視必須
     |  (計算)       (読み取り)    (設定変更)
     +----------------------------------------
        低           中           高
                   可逆性の低さ (非可逆度)

1.3 安全性設計のフレームワーク

安全性設計の4原則

1. Defense in Depth(多層防御)
   - 単一の防御手段に依存しない
   - 各レイヤーが独立して機能する
   - 1つのレイヤーが突破されても他が防御

2. Least Privilege(最小権限)
   - 必要最小限の権限のみ付与
   - セッションごとに動的に権限を調整
   - デフォルトは拒否(deny by default)

3. Fail Safe(安全側への障害)
   - 判断に迷う場合は安全な選択をする
   - システム障害時はエージェントを停止
   - 不明な入力は拒否する

4. Auditability(監査可能性)
   - 全操作をログに記録
   - 意思決定の根拠を追跡可能にする
   - 事後分析が可能な粒度で記録

2. ガードレール設計

2.1 ツールコール検証システム

# コード例 1: ツールコールのガードレール実装
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Any
import re
 
class ActionLevel(Enum):
    ALLOW = "allow"          # 自動許可
    LOG = "log"              # ログ記録して許可
    CONFIRM = "confirm"      # 人間の確認が必要
    DENY = "deny"            # 拒否
 
@dataclass
class ToolPolicy:
    """各ツールのセキュリティポリシー"""
    tool_name: str
    default_level: ActionLevel
    max_calls_per_session: int
    allowed_parameters: dict | None  # パラメータの許容値
    validators: list[Callable]       # カスタムバリデータ
 
class ToolGuardrail:
    """ツール呼び出しのガードレール"""
 
    def __init__(self, policies: list[ToolPolicy]):
        self.policies = {p.tool_name: p for p in policies}
        self.call_counts: dict[str, int] = {}
 
    async def check(self, tool_name: str, parameters: dict,
                     context: dict) -> tuple[ActionLevel, str]:
        """ツール呼び出しを検証し、アクションレベルを返す"""
 
        policy = self.policies.get(tool_name)
        if policy is None:
            return ActionLevel.DENY, f"未登録のツール: {tool_name}"
 
        # 1. 呼び出し回数チェック
        count = self.call_counts.get(tool_name, 0)
        if count >= policy.max_calls_per_session:
            return ActionLevel.DENY, (
                f"セッション上限超過: {tool_name} "
                f"({count}/{policy.max_calls_per_session})"
            )
 
        # 2. パラメータ検証
        if policy.allowed_parameters:
            for param, allowed in policy.allowed_parameters.items():
                value = parameters.get(param)
                if value is not None and value not in allowed:
                    return ActionLevel.DENY, (
                        f"パラメータ '{param}' の値 '{value}' は"
                        f"許可されていません"
                    )
 
        # 3. カスタムバリデータ
        for validator in policy.validators:
            result = await validator(parameters, context)
            if not result["ok"]:
                return ActionLevel.DENY, result["reason"]
 
        # 4. 呼び出しカウント更新
        self.call_counts[tool_name] = count + 1
 
        return policy.default_level, "OK"
 
 
# ポリシーの定義例
policies = [
    ToolPolicy(
        tool_name="web_search",
        default_level=ActionLevel.ALLOW,
        max_calls_per_session=50,
        allowed_parameters=None,
        validators=[],
    ),
    ToolPolicy(
        tool_name="send_email",
        default_level=ActionLevel.CONFIRM,
        max_calls_per_session=5,
        allowed_parameters={
            "to_domain": ["@company.com", "@partner.com"],
        },
        validators=[check_email_content_safety],
    ),
    ToolPolicy(
        tool_name="execute_code",
        default_level=ActionLevel.LOG,
        max_calls_per_session=20,
        allowed_parameters=None,
        validators=[check_code_safety, check_no_network_access],
    ),
    ToolPolicy(
        tool_name="delete_file",
        default_level=ActionLevel.CONFIRM,
        max_calls_per_session=3,
        allowed_parameters=None,
        validators=[check_not_system_file, check_backup_exists],
    ),
]

2.2 出力フィルタリング

# コード例 2: エージェント出力の安全性フィルタリング
class OutputGuardrail:
    """エージェントの出力をフィルタリングする"""
 
    # PII パターン
    PII_PATTERNS = {
        "email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
        "phone_jp": r"0\d{1,4}-?\d{1,4}-?\d{3,4}",
        "credit_card": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
        "my_number": r"\b\d{4}\s?\d{4}\s?\d{4}\b",  # マイナンバー
    }
 
    async def filter_output(self, output: str, context: dict) -> dict:
        """出力を検証し、必要に応じてフィルタリングする"""
        issues = []
 
        # 1. PII 検出
        pii_found = self._detect_pii(output)
        if pii_found:
            output = self._mask_pii(output, pii_found)
            issues.append({
                "type": "pii_detected",
                "count": len(pii_found),
                "action": "masked"
            })
 
        # 2. 有害性チェック
        toxicity = await self._check_toxicity(output)
        if toxicity["score"] > 0.8:
            issues.append({
                "type": "toxic_content",
                "score": toxicity["score"],
                "action": "blocked"
            })
            return {
                "output": "[安全性フィルタにより出力がブロックされました]",
                "issues": issues,
                "blocked": True
            }
 
        # 3. 機密情報の漏洩チェック
        if self._contains_system_prompt(output, context):
            issues.append({
                "type": "system_prompt_leak",
                "action": "blocked"
            })
            return {
                "output": "[システム情報の漏洩を検出しブロックしました]",
                "issues": issues,
                "blocked": True
            }
 
        return {
            "output": output,
            "issues": issues,
            "blocked": False
        }
 
    def _detect_pii(self, text: str) -> list[dict]:
        findings = []
        for pii_type, pattern in self.PII_PATTERNS.items():
            matches = re.findall(pattern, text)
            for match in matches:
                findings.append({"type": pii_type, "value": match})
        return findings
 
    def _mask_pii(self, text: str, findings: list[dict]) -> str:
        for finding in findings:
            text = text.replace(
                finding["value"],
                f"[{finding['type'].upper()}_MASKED]"
            )
        return text

2.3 入力検証とインジェクション対策

class InputGuardrail:
    """エージェントへの入力を検証し、攻撃を防ぐ"""
 
    # インジェクション検出パターン
    INJECTION_PATTERNS = [
        r"ignore\s+(previous|above|all)\s+(instructions?|prompts?)",
        r"you\s+are\s+now\s+a",
        r"pretend\s+(to\s+be|you\s+are)",
        r"system\s*:\s*",
        r"<\|?(system|assistant|user)\|?>",
        r"forget\s+(everything|all|your)",
        r"new\s+instructions?\s*:",
        r"override\s+(previous|system|all)",
        r"jailbreak",
        r"DAN\s+(mode|prompt)",
    ]
 
    def __init__(self):
        self._compiled_patterns = [
            re.compile(p, re.IGNORECASE) for p in self.INJECTION_PATTERNS
        ]
 
    async def validate_input(self, user_input: str,
                             context: dict) -> dict:
        """入力を検証し、安全性を評価する"""
        issues = []
 
        # 1. 基本的な長さチェック
        if len(user_input) > 100_000:
            return {
                "valid": False,
                "reason": "入力が長すぎます(100,000文字以上)",
                "issues": [{"type": "input_too_long"}]
            }
 
        # 2. パターンベースのインジェクション検出
        for pattern in self._compiled_patterns:
            if pattern.search(user_input):
                issues.append({
                    "type": "injection_pattern_detected",
                    "pattern": pattern.pattern,
                    "severity": "high"
                })
 
        # 3. エンコーディング攻撃の検出
        if self._detect_encoding_attack(user_input):
            issues.append({
                "type": "encoding_attack",
                "severity": "high"
            })
 
        # 4. 不可視文字の検出
        invisible_chars = self._detect_invisible_characters(user_input)
        if invisible_chars:
            issues.append({
                "type": "invisible_characters",
                "count": len(invisible_chars),
                "severity": "medium"
            })
 
        # 5. LLMベースのインジェクション検出(高精度・低速)
        if issues:
            llm_check = await self._llm_injection_check(user_input)
            issues.append({
                "type": "llm_injection_analysis",
                "is_injection": llm_check["is_injection"],
                "confidence": llm_check["confidence"]
            })
 
        high_severity = any(
            i.get("severity") == "high" for i in issues
        )
        return {
            "valid": not high_severity,
            "issues": issues,
            "sanitized_input": self._sanitize(user_input)
                if not high_severity else None
        }
 
    def _detect_encoding_attack(self, text: str) -> bool:
        """Base64、Unicode等のエンコーディング攻撃を検出"""
        import base64
        # Base64でエンコードされた命令の検出
        for word in text.split():
            try:
                decoded = base64.b64decode(word).decode("utf-8")
                for pattern in self._compiled_patterns:
                    if pattern.search(decoded):
                        return True
            except Exception:
                continue
        return False
 
    def _detect_invisible_characters(self, text: str) -> list[int]:
        """ゼロ幅文字などの不可視文字を検出"""
        invisible = []
        invisible_ranges = [
            (0x200B, 0x200F),  # ゼロ幅スペース等
            (0x2028, 0x202F),  # 行/段落区切り等
            (0xFEFF, 0xFEFF),  # BOM
        ]
        for i, char in enumerate(text):
            code = ord(char)
            for start, end in invisible_ranges:
                if start <= code <= end:
                    invisible.append(i)
        return invisible
 
    def _sanitize(self, text: str) -> str:
        """入力をサニタイズして安全な形に変換"""
        # 不可視文字の除去
        cleaned = ""
        for char in text:
            code = ord(char)
            if code < 0x20 and code not in (0x0A, 0x0D, 0x09):
                continue
            if 0x200B <= code <= 0x200F:
                continue
            cleaned += char
        return cleaned
 
    async def _llm_injection_check(self, text: str) -> dict:
        """LLMを使った高精度なインジェクション検出"""
        prompt = f"""以下のテキストがプロンプトインジェクション攻撃を
含むかどうかを判定してください。
 
テキスト:
---
{text[:2000]}
---
 
JSON形式で回答してください:
{{"is_injection": true/false, "confidence": 0.0-1.0, "reason": "..."}}"""
 
        response = await self.classifier_llm.create(
            model="claude-haiku-4-20250514",
            max_tokens=200,
            messages=[{"role": "user", "content": prompt}]
        )
        return json.loads(response.content[0].text)

3. 人間監視 (Human-in-the-Loop)

3.1 承認ワークフロー

+------------------------------------------------------------------+
|                    Human-in-the-Loop パターン                      |
+------------------------------------------------------------------+
|                                                                    |
|  パターン A: 事前承認 (Pre-Approval)                               |
|  エージェント → [計画提示] → 人間承認 → 実行                       |
|  用途: 高リスク操作、不可逆な変更                                  |
|                                                                    |
|  パターン B: 事後確認 (Post-Verification)                          |
|  エージェント → 実行 → [結果提示] → 人間確認 → 確定/取消           |
|  用途: 中リスク操作、取り消し可能な変更                            |
|                                                                    |
|  パターン C: 監視 (Monitoring)                                     |
|  エージェント → 実行 → ログ記録 → [異常時のみ] → 人間アラート      |
|  用途: 低リスク操作、大量の定型処理                                |
|                                                                    |
|  パターン D: エスカレーション (Escalation)                         |
|  エージェント → 自信度判定 → [低自信度] → 人間に委譲               |
|                           → [高自信度] → 自動実行                  |
|  用途: 判断の確実性が変動する場面                                  |
+------------------------------------------------------------------+

3.2 承認ワークフローの実装

# コード例 3: Human-in-the-Loop 承認フロー
import asyncio
from enum import Enum
 
class ApprovalStatus(Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"
    TIMEOUT = "timeout"
 
class HumanApprovalGate:
    """人間の承認を要求するゲート"""
 
    def __init__(self, notification_service, timeout_seconds=300):
        self.notification_service = notification_service
        self.timeout = timeout_seconds
        self.pending_requests: dict[str, asyncio.Future] = {}
 
    async def request_approval(self, action_description: str,
                                parameters: dict,
                                risk_level: str,
                                reviewer_id: str) -> ApprovalStatus:
        """人間の承認をリクエストし、結果を待つ"""
 
        request_id = str(uuid.uuid4())
 
        # 承認リクエストを通知
        await self.notification_service.send(
            to=reviewer_id,
            message={
                "type": "approval_request",
                "request_id": request_id,
                "action": action_description,
                "parameters": parameters,
                "risk_level": risk_level,
                "expires_at": time.time() + self.timeout,
            }
        )
 
        # 承認待ちの Future を作成
        future = asyncio.get_event_loop().create_future()
        self.pending_requests[request_id] = future
 
        try:
            # タイムアウト付きで承認を待つ
            result = await asyncio.wait_for(future, timeout=self.timeout)
            return result
        except asyncio.TimeoutError:
            return ApprovalStatus.TIMEOUT
        finally:
            self.pending_requests.pop(request_id, None)
 
    async def submit_decision(self, request_id: str,
                               status: ApprovalStatus,
                               comment: str = ""):
        """レビューアーが承認/拒否を提出する"""
        future = self.pending_requests.get(request_id)
        if future and not future.done():
            future.set_result(status)
 
 
class SafeAgent:
    """安全なエージェント実行フレームワーク"""
 
    def __init__(self, llm, tools, guardrail, approval_gate):
        self.llm = llm
        self.tools = tools
        self.guardrail = guardrail
        self.approval_gate = approval_gate
 
    async def execute_action(self, tool_name: str,
                              parameters: dict, context: dict):
        """ガードレール付きでアクションを実行する"""
 
        # Step 1: ガードレールチェック
        level, reason = await self.guardrail.check(
            tool_name, parameters, context
        )
 
        if level == ActionLevel.DENY:
            return {"status": "denied", "reason": reason}
 
        if level == ActionLevel.CONFIRM:
            # Step 2: 人間の承認を要求
            status = await self.approval_gate.request_approval(
                action_description=f"{tool_name}({parameters})",
                parameters=parameters,
                risk_level="high",
                reviewer_id=context["owner_id"],
            )
 
            if status != ApprovalStatus.APPROVED:
                return {"status": "rejected", "approval": status.value}
 
        # Step 3: 実行
        try:
            result = await self.tools[tool_name].execute(parameters)
            return {"status": "success", "result": result}
        except Exception as e:
            return {"status": "error", "error": str(e)}

3.3 自信度ベースのエスカレーション

class ConfidenceBasedEscalation:
    """エージェントの自信度に基づくエスカレーション判定"""
 
    def __init__(
        self,
        auto_threshold: float = 0.9,
        confirm_threshold: float = 0.6,
        escalate_threshold: float = 0.3
    ):
        self.auto_threshold = auto_threshold
        self.confirm_threshold = confirm_threshold
        self.escalate_threshold = escalate_threshold
 
    async def evaluate_and_route(
        self,
        agent_response: dict,
        context: dict
    ) -> dict:
        """レスポンスの自信度を評価し、ルーティングを決定"""
 
        confidence = await self._assess_confidence(agent_response)
 
        if confidence >= self.auto_threshold:
            # 高自信度: 自動実行
            return {
                "action": "auto_execute",
                "confidence": confidence,
                "response": agent_response
            }
 
        elif confidence >= self.confirm_threshold:
            # 中自信度: 簡易確認
            return {
                "action": "quick_confirm",
                "confidence": confidence,
                "response": agent_response,
                "message": "エージェントの回答を確認してください"
            }
 
        elif confidence >= self.escalate_threshold:
            # 低自信度: 詳細確認
            return {
                "action": "detailed_review",
                "confidence": confidence,
                "response": agent_response,
                "alternatives": await self._generate_alternatives(
                    agent_response, context
                ),
                "message": "複数の選択肢から適切なものを選んでください"
            }
 
        else:
            # 非常に低い自信度: 人間に完全委譲
            return {
                "action": "full_escalation",
                "confidence": confidence,
                "context_summary": self._summarize_context(context),
                "message": "エージェントが適切に処理できません。"
                          "人間の対応が必要です。"
            }
 
    async def _assess_confidence(self, response: dict) -> float:
        """レスポンスの自信度を評価"""
        factors = []
 
        # 1. LLMの自己評価
        if "confidence" in response:
            factors.append(response["confidence"])
 
        # 2. 複数回答の一致度
        if "alternatives" in response:
            consistency = self._measure_consistency(
                response["primary"], response["alternatives"]
            )
            factors.append(consistency)
 
        # 3. ツール呼び出し結果の信頼性
        if "tool_results" in response:
            tool_reliability = self._assess_tool_results(
                response["tool_results"]
            )
            factors.append(tool_reliability)
 
        return sum(factors) / len(factors) if factors else 0.5

4. 権限制限とサンドボックス

4.1 最小権限設計

# コード例 4: ロールベースのエージェント権限管理
from dataclasses import dataclass, field
 
@dataclass
class AgentPermissions:
    """エージェントの権限定義"""
 
    # ファイルシステム
    allowed_read_paths: list[str] = field(default_factory=list)
    allowed_write_paths: list[str] = field(default_factory=list)
    max_file_size_mb: int = 10
 
    # ネットワーク
    allowed_domains: list[str] = field(default_factory=list)
    blocked_domains: list[str] = field(
        default_factory=lambda: ["*.internal", "localhost"]
    )
    max_requests_per_minute: int = 30
 
    # コード実行
    allow_code_execution: bool = False
    allowed_languages: list[str] = field(default_factory=list)
    max_execution_time_seconds: int = 30
    max_memory_mb: int = 256
 
    # 外部サービス
    allowed_tools: list[str] = field(default_factory=list)
    denied_tools: list[str] = field(default_factory=list)
 
    # リソース制限
    max_tokens_per_session: int = 100_000
    max_tool_calls_per_session: int = 50
    session_timeout_minutes: int = 60
 
 
# ロール別のプリセット
ROLE_PRESETS = {
    "researcher": AgentPermissions(
        allowed_read_paths=["/data/public/", "/data/research/"],
        allowed_write_paths=["/output/research/"],
        allowed_domains=["*.arxiv.org", "*.wikipedia.org", "*.github.com"],
        allow_code_execution=True,
        allowed_languages=["python"],
        allowed_tools=["web_search", "read_file", "write_file",
                       "execute_python"],
        max_tool_calls_per_session=100,
    ),
    "customer_support": AgentPermissions(
        allowed_read_paths=["/data/faq/", "/data/products/"],
        allowed_write_paths=[],
        allowed_domains=["api.company.com"],
        allow_code_execution=False,
        allowed_tools=["search_faq", "lookup_order", "create_ticket"],
        max_tool_calls_per_session=30,
    ),
    "admin": AgentPermissions(
        allowed_read_paths=["/"],
        allowed_write_paths=["/data/", "/config/"],
        allowed_domains=["*"],
        allow_code_execution=True,
        allowed_languages=["python", "bash"],
        allowed_tools=["*"],
        max_tool_calls_per_session=200,
    ),
}

4.2 サンドボックス実行環境

# コード例 5: Docker ベースのサンドボックスでコードを実行する
import docker
import tempfile
import os
 
class CodeSandbox:
    """Docker コンテナ内でエージェントのコードを安全に実行する"""
 
    def __init__(self, permissions: AgentPermissions):
        self.client = docker.from_env()
        self.permissions = permissions
 
    async def execute(self, code: str, language: str = "python") -> dict:
        """サンドボックス内でコードを実行する"""
 
        if language not in self.permissions.allowed_languages:
            return {"error": f"言語 '{language}' は許可されていません"}
 
        # 一時ファイルにコードを書き出す
        with tempfile.NamedTemporaryFile(
            mode='w', suffix='.py', delete=False
        ) as f:
            f.write(code)
            code_path = f.name
 
        try:
            container = self.client.containers.run(
                image="python:3.11-slim",
                command=f"python /code/script.py",
                volumes={
                    code_path: {"bind": "/code/script.py", "mode": "ro"}
                },
                # セキュリティ制限
                mem_limit=f"{self.permissions.max_memory_mb}m",
                cpu_period=100000,
                cpu_quota=50000,     # CPU 50%制限
                network_disabled=True,  # ネットワーク無効
                read_only=True,      # ファイルシステム読み取り専用
                security_opt=["no-new-privileges"],
                # タイムアウト
                detach=True,
            )
 
            # 実行完了を待つ(タイムアウト付き)
            result = container.wait(
                timeout=self.permissions.max_execution_time_seconds
            )
 
            logs = container.logs().decode("utf-8")
            exit_code = result["StatusCode"]
 
            return {
                "stdout": logs,
                "exit_code": exit_code,
                "success": exit_code == 0,
            }
 
        except docker.errors.ContainerError as e:
            return {"error": f"実行エラー: {e}"}
        except Exception as e:
            return {"error": f"サンドボックスエラー: {e}"}
        finally:
            os.unlink(code_path)
            try:
                container.remove(force=True)
            except:
                pass

4.3 動的権限制御

class DynamicPermissionManager:
    """セッション中にエージェントの権限を動的に調整"""
 
    def __init__(self, base_permissions: AgentPermissions):
        self.base = base_permissions
        self.current = AgentPermissions(**vars(base_permissions))
        self.escalation_history: list[dict] = []
        self.violation_count: int = 0
 
    def request_elevated_permission(
        self,
        resource_type: str,
        resource_name: str,
        justification: str
    ) -> dict:
        """権限昇格をリクエスト"""
        request = {
            "type": resource_type,
            "resource": resource_name,
            "justification": justification,
            "timestamp": time.time(),
            "status": "pending"
        }
        self.escalation_history.append(request)
        return request
 
    def grant_temporary_permission(
        self,
        resource_type: str,
        resource_name: str,
        duration_minutes: int = 10
    ):
        """一時的な権限付与"""
        if resource_type == "tool":
            if resource_name not in self.current.allowed_tools:
                self.current.allowed_tools.append(resource_name)
        elif resource_type == "read_path":
            if resource_name not in self.current.allowed_read_paths:
                self.current.allowed_read_paths.append(resource_name)
        elif resource_type == "write_path":
            if resource_name not in self.current.allowed_write_paths:
                self.current.allowed_write_paths.append(resource_name)
 
        # 一定時間後に権限を自動回収
        asyncio.get_event_loop().call_later(
            duration_minutes * 60,
            self._revoke_permission,
            resource_type,
            resource_name
        )
 
    def record_violation(self, violation_type: str, details: str):
        """権限違反を記録し、必要に応じて権限を縮小"""
        self.violation_count += 1
 
        if self.violation_count >= 3:
            # 3回以上の違反で権限を縮小
            self._reduce_permissions()
 
        if self.violation_count >= 5:
            # 5回以上でセッション停止
            raise SecurityViolationError(
                f"権限違反が{self.violation_count}回に達しました。"
                "セッションを終了します。"
            )
 
    def _reduce_permissions(self):
        """権限を段階的に縮小"""
        self.current.max_tool_calls_per_session = max(
            10,
            self.current.max_tool_calls_per_session // 2
        )
        self.current.allow_code_execution = False
        self.current.allowed_write_paths = []
 
    def _revoke_permission(self, resource_type: str, resource_name: str):
        """一時権限を回収"""
        if resource_type == "tool":
            if (resource_name in self.current.allowed_tools
                    and resource_name not in self.base.allowed_tools):
                self.current.allowed_tools.remove(resource_name)

5. 監査とコンプライアンス

5.1 監査ログの実装

import json
from datetime import datetime
from enum import Enum
 
class AuditEventType(Enum):
    AGENT_START = "agent_start"
    AGENT_END = "agent_end"
    TOOL_CALL = "tool_call"
    TOOL_RESULT = "tool_result"
    APPROVAL_REQUESTED = "approval_requested"
    APPROVAL_GRANTED = "approval_granted"
    APPROVAL_DENIED = "approval_denied"
    GUARDRAIL_TRIGGERED = "guardrail_triggered"
    PERMISSION_VIOLATION = "permission_violation"
    OUTPUT_FILTERED = "output_filtered"
    ESCALATION = "escalation"
 
class AuditLogger:
    """全エージェント操作の監査ログを記録"""
 
    def __init__(self, storage_backend):
        self.storage = storage_backend
 
    async def log_event(
        self,
        event_type: AuditEventType,
        session_id: str,
        user_id: str,
        agent_id: str,
        details: dict,
        metadata: dict | None = None
    ):
        """監査イベントを記録"""
        event = {
            "event_id": str(uuid.uuid4()),
            "event_type": event_type.value,
            "timestamp": datetime.utcnow().isoformat(),
            "session_id": session_id,
            "user_id": user_id,
            "agent_id": agent_id,
            "details": details,
            "metadata": metadata or {},
        }
 
        # 改ざん防止のためハッシュチェーンを使用
        event["hash"] = self._compute_hash(event)
        event["previous_hash"] = await self._get_last_hash(session_id)
 
        await self.storage.append(
            key=f"audit:{session_id}",
            value=json.dumps(event)
        )
 
        # 重大イベントは即時通知
        if event_type in (
            AuditEventType.PERMISSION_VIOLATION,
            AuditEventType.GUARDRAIL_TRIGGERED,
        ):
            await self._notify_security_team(event)
 
    def _compute_hash(self, event: dict) -> str:
        """イベントのハッシュを計算"""
        import hashlib
        content = json.dumps(
            {k: v for k, v in event.items() if k != "hash"},
            sort_keys=True
        )
        return hashlib.sha256(content.encode()).hexdigest()
 
    async def _get_last_hash(self, session_id: str) -> str:
        """直前のイベントハッシュを取得"""
        last_event = await self.storage.get_last(f"audit:{session_id}")
        if last_event:
            return json.loads(last_event)["hash"]
        return "genesis"
 
    async def generate_audit_report(
        self,
        session_id: str
    ) -> dict:
        """セッションの監査レポートを生成"""
        events = await self.storage.get_all(f"audit:{session_id}")
        parsed = [json.loads(e) for e in events]
 
        report = {
            "session_id": session_id,
            "total_events": len(parsed),
            "start_time": parsed[0]["timestamp"] if parsed else None,
            "end_time": parsed[-1]["timestamp"] if parsed else None,
            "tool_calls": [
                e for e in parsed
                if e["event_type"] == "tool_call"
            ],
            "guardrail_triggers": [
                e for e in parsed
                if e["event_type"] == "guardrail_triggered"
            ],
            "violations": [
                e for e in parsed
                if e["event_type"] == "permission_violation"
            ],
            "approvals": {
                "requested": len([
                    e for e in parsed
                    if e["event_type"] == "approval_requested"
                ]),
                "granted": len([
                    e for e in parsed
                    if e["event_type"] == "approval_granted"
                ]),
                "denied": len([
                    e for e in parsed
                    if e["event_type"] == "approval_denied"
                ]),
            },
            "integrity_valid": self._verify_hash_chain(parsed),
        }
 
        return report
 
    def _verify_hash_chain(self, events: list[dict]) -> bool:
        """ハッシュチェーンの整合性を検証"""
        for i, event in enumerate(events):
            expected_hash = self._compute_hash(event)
            if event["hash"] != expected_hash:
                return False
            if i > 0 and event["previous_hash"] != events[i-1]["hash"]:
                return False
        return True

5.2 コンプライアンスチェッカー

class ComplianceChecker:
    """規制要件に基づくコンプライアンスチェック"""
 
    def __init__(self, regulations: list[str]):
        self.regulations = regulations
        self.rules = self._load_rules(regulations)
 
    def _load_rules(self, regulations: list[str]) -> dict:
        """規制ごとのルールを読み込み"""
        all_rules = {
            "GDPR": {
                "data_retention_days": 30,
                "requires_consent": True,
                "right_to_erasure": True,
                "data_portability": True,
                "pii_categories": [
                    "name", "email", "phone", "address",
                    "ip_address", "location"
                ],
            },
            "HIPAA": {
                "phi_categories": [
                    "medical_record", "diagnosis", "treatment",
                    "insurance_info", "patient_id"
                ],
                "requires_encryption": True,
                "audit_trail_required": True,
                "minimum_access_controls": True,
            },
            "SOC2": {
                "requires_access_logs": True,
                "requires_encryption_at_rest": True,
                "requires_encryption_in_transit": True,
                "change_management_required": True,
            },
            "APPI": {  # 個人情報保護法(日本)
                "requires_purpose_specification": True,
                "requires_consent_for_third_party": True,
                "data_breach_notification_required": True,
                "cross_border_transfer_restrictions": True,
            },
        }
        return {r: all_rules[r] for r in regulations if r in all_rules}
 
    async def check_action(
        self,
        action: str,
        data_categories: list[str],
        context: dict
    ) -> dict:
        """アクションがコンプライアンス要件を満たすか検証"""
        violations = []
 
        for reg_name, rules in self.rules.items():
            # PII/PHI カテゴリのチェック
            sensitive_cats = rules.get(
                "pii_categories",
                rules.get("phi_categories", [])
            )
            overlap = set(data_categories) & set(sensitive_cats)
            if overlap:
                if rules.get("requires_consent") and not context.get("has_consent"):
                    violations.append({
                        "regulation": reg_name,
                        "violation": "consent_missing",
                        "categories": list(overlap),
                        "severity": "high"
                    })
 
            # 暗号化チェック
            if rules.get("requires_encryption"):
                if not context.get("encryption_enabled"):
                    violations.append({
                        "regulation": reg_name,
                        "violation": "encryption_missing",
                        "severity": "high"
                    })
 
        return {
            "compliant": len(violations) == 0,
            "violations": violations,
            "regulations_checked": list(self.rules.keys())
        }

6. 監視とアラート

6.1 エージェント行動の監視ダッシュボード

+------------------------------------------------------------------+
|                エージェント監視ダッシュボード                       |
+------------------------------------------------------------------+
|                                                                    |
|  リアルタイム指標:                                                 |
|  +------------------+  +------------------+  +------------------+ |
|  | アクティブ       |  | ツール呼び出し   |  | エラー率         | |
|  | セッション: 142  |  | QPS: 2,340      |  | 0.3%             | |
|  +------------------+  +------------------+  +------------------+ |
|                                                                    |
|  +------------------+  +------------------+  +------------------+ |
|  | 承認待ち: 7      |  | 拒否された       |  | 平均セッション   | |
|  |                  |  | アクション: 23   |  | 時間: 4.2min     | |
|  +------------------+  +------------------+  +------------------+ |
|                                                                    |
|  アラート条件:                                                     |
|  [!] 同一ツールの連続呼び出し > 10回/分                            |
|  [!] エラー率 > 5%                                                 |
|  [!] セッション時間 > 30分                                         |
|  [!] 拒否されたアクション率 > 10%                                  |
|  [!] トークン消費量 > 80% of limit                                 |
+------------------------------------------------------------------+

6.2 異常検知パターン

class AnomalyDetector:
    """エージェントの異常行動を検知"""
 
    def __init__(self):
        self.baselines: dict[str, dict] = {}
        self.alerts: list[dict] = []
 
    async def check_for_anomalies(
        self,
        session_id: str,
        metrics: dict
    ) -> list[dict]:
        """セッションメトリクスの異常を検知"""
        anomalies = []
 
        # 1. ループ検出: 同一ツールの連続呼び出し
        if metrics.get("consecutive_same_tool", 0) > 5:
            anomalies.append({
                "type": "tool_loop_detected",
                "tool": metrics["last_tool"],
                "count": metrics["consecutive_same_tool"],
                "severity": "high",
                "action": "pause_agent"
            })
 
        # 2. 異常なトークン消費
        avg_tokens = self.baselines.get(
            "avg_tokens_per_step", 500
        )
        if metrics.get("tokens_this_step", 0) > avg_tokens * 5:
            anomalies.append({
                "type": "token_spike",
                "current": metrics["tokens_this_step"],
                "expected": avg_tokens,
                "severity": "medium",
                "action": "log_and_continue"
            })
 
        # 3. 失敗率の急上昇
        if metrics.get("recent_error_rate", 0) > 0.5:
            anomalies.append({
                "type": "high_error_rate",
                "rate": metrics["recent_error_rate"],
                "severity": "high",
                "action": "escalate_to_human"
            })
 
        # 4. 権限外のリソースアクセス試行
        if metrics.get("permission_denied_count", 0) > 3:
            anomalies.append({
                "type": "repeated_permission_violation",
                "count": metrics["permission_denied_count"],
                "severity": "critical",
                "action": "terminate_session"
            })
 
        # 5. 応答時間の異常
        if metrics.get("step_duration_seconds", 0) > 120:
            anomalies.append({
                "type": "long_running_step",
                "duration": metrics["step_duration_seconds"],
                "severity": "medium",
                "action": "check_for_hang"
            })
 
        return anomalies

7. パターン比較

7.1 安全性パターンの比較

パターン 安全性 ユーザー体験 実装コスト 適用場面
全操作事前承認 極高 低 (遅延大) 金融、医療
リスクベース承認 一般業務
事後監視 + 取消 社内ツール
自信度ベースエスカレーション 中-高 カスタマーサポート
サンドボックス + ログ 開発・研究

7.2 エージェント権限モデルの比較

モデル 粒度 柔軟性 管理コスト 適用場面
ホワイトリスト ツール単位 シンプルなエージェント
RBAC ロール単位 組織内エージェント
ABAC 属性ベース 複雑な権限要件
Capability-based 能力トークン マルチエージェント

7.3 インジェクション対策手法の比較

手法 検出精度 レイテンシ コスト 誤検知率
パターンマッチング 極低 なし
LLMベース分類 API費用
入力サニタイズ 極低 なし なし
プロンプト分離 なし なし なし
出力検証 なし
多層組み合わせ 最高 最低

8. アンチパターン

アンチパターン 1: 「全権限を付与した汎用エージェント」

[誤り] エージェントに全ツール・全リソースへのアクセスを許可する

  「何でもできるAIアシスタント」
  → ファイル削除、メール送信、API呼び出しが全て自動実行
  → プロンプトインジェクションで全権限が悪用される

[正解] 最小権限の原則を適用する
  1. タスクに必要なツールのみを許可
  2. アクセス可能なファイル/ディレクトリを制限
  3. ネットワークアクセスをホワイトリストで管理
  4. セッションごとに権限を動的に付与

アンチパターン 2: 「エージェントのループを放置」

[誤り] エージェントが無限ループや反復的な無意味な操作を
       検出・停止する仕組みがない

実際に起きた問題:
  - 同じ API を数千回呼び出してレート制限に到達
  - ファイルの読み書きを繰り返してディスクを圧迫
  - トークンを大量消費してコストが爆発

[正解] ループ検出とリソース制限を実装する
  - 同一ツールの連続呼び出し検出 (> N回で停止)
  - セッションあたりのトークン上限
  - ツール呼び出し回数の上限
  - タイムアウトの設定
  - コスト上限の設定($XX/セッション)

アンチパターン 3: 「出力を信頼して検証しない」

[誤り] エージェントの出力をフィルタリングせずにユーザーに提示

問題:
  - PII(個人情報)の漏洩
  - システムプロンプトの漏洩
  - 有害なコンテンツの出力
  - ハルシネーションを事実として提示

[正解] 出力ガードレールを必ず実装する
  - PII検出 + マスキング
  - 有害性スコアリング
  - システムプロンプト漏洩チェック
  - 事実検証(可能な場合)

アンチパターン 4: 「監査ログなしの運用」

[誤り] エージェントの行動を記録せず、事後分析ができない

問題:
  - インシデント発生時に原因特定ができない
  - コンプライアンス監査に対応できない
  - 改善のためのデータが蓄積されない

[正解] 全操作の監査ログを記録する
  - ツール呼び出し: 名前、パラメータ、結果
  - 意思決定: 自信度、選択理由
  - ガードレール: トリガー回数、ブロック内容
  - ユーザーインタラクション: 承認/拒否の履歴

9. 実装チェックリスト

9.1 安全性実装チェックリスト

必須レベル(Must Have):
[ ] 入力バリデーション(長さ、形式)
[ ] プロンプトインジェクション検出(パターンベース)
[ ] ツール呼び出し権限制御(ホワイトリスト)
[ ] 出力フィルタリング(PII検出)
[ ] セッションタイムアウト
[ ] トークン使用量の上限
[ ] ツール呼び出し回数の上限
[ ] 全操作の監査ログ
[ ] エラーハンドリングとフォールバック

推奨レベル(Should Have):
[ ] LLMベースのインジェクション検出
[ ] リスクベースの承認ワークフロー
[ ] サンドボックス実行環境
[ ] ループ検出と自動停止
[ ] コスト制限と予算管理
[ ] 異常検知アラート
[ ] ロールベースの権限管理

高度レベル(Nice to Have):
[ ] 自信度ベースのエスカレーション
[ ] 動的権限制御
[ ] コンプライアンスチェッカー
[ ] ハッシュチェーンによるログ改ざん防止
[ ] A/Bテスト付き安全性評価
[ ] 多言語対応のPII検出

10. FAQ

Q1: エージェントにどの程度の自律性を与えるべきですか?

A: リスクと業務効率のバランスで段階的に決定します。

  • Level 1 (推薦): エージェントが提案し、人間が全て実行する
  • Level 2 (承認付き実行): エージェントが実行計画を立て、人間が承認後に実行する
  • Level 3 (監視付き自律): エージェントが自律実行し、人間は監視のみ。異常時に介入
  • Level 4 (完全自律): エージェントが完全に自律実行。定期的な事後レビューのみ

本番環境では Level 2-3 から始め、信頼性が実証された後に Level を上げることを推奨します。

Q2: マルチエージェント環境での安全性はどう設計しますか?

A: 以下の原則を適用します。

  1. エージェント間の権限分離: 各エージェントは独自の権限セットを持ち、他のエージェントの権限を継承しない
  2. 通信チャネルの制限: エージェント間の通信は定義済みのプロトコルに限定し、直接的なプロンプト注入を防止
  3. オーケストレーターの監視: 中央のオーケストレーターが全エージェントのアクションを監視し、異常な協調動作を検出
  4. 最終決定権の集約: 重要な意思決定は単一のエージェント(またはオーケストレーター)に集約

Q3: コスト爆発を防ぐにはどうすればよいですか?

A: 多層的なコスト制限を設けます。

  • セッション単位: 1セッションあたりの最大トークン数と最大ツール呼び出し数を設定
  • ユーザー単位: 1日/月あたりの利用上限を設定
  • 組織単位: 月間予算の上限を設定し、閾値でアラート(80%, 90%, 95%)
  • リアルタイム監視: コスト監視ダッシュボードで異常な消費パターンを検出
  • 自動停止: 予算の 100% に達したら全エージェントを自動停止

Q4: プロンプトインジェクション対策の優先順位は?

A: 以下の順に実装を推奨します。

  1. システムプロンプトの分離: ユーザー入力とシステム命令を明確に分離(コストゼロで効果大)
  2. パターンマッチング: 既知のインジェクションパターンを検出(低コスト・即時実装可能)
  3. 入力サニタイズ: 不可視文字やエンコーディング攻撃の除去
  4. 出力検証: システムプロンプトの漏洩やPII漏洩をチェック
  5. LLMベース分類: 高精度だがコストがかかるため、疑わしい入力のみに適用

Q5: セキュリティインシデント発生時の対応手順は?

A: 以下のフローに従います。

  1. 即時対応: 該当セッションを即座に停止し、エージェントの操作を凍結
  2. 影響範囲の特定: 監査ログから影響を受けたユーザー・データを特定
  3. 証拠保全: 関連ログ・メトリクス・スナップショットを保全
  4. 通知: 影響を受けたユーザーおよび関係者に通知(規制要件に応じて)
  5. 根本原因分析: インシデントの原因を特定し、再発防止策を策定
  6. 改善実施: ガードレール・権限設定・検出ロジックを更新

FAQ

Q1: このトピックを学ぶ上で最も重要なポイントは何ですか?

実践的な経験を積むことが最も重要です。理論だけでなく、実際にコードを書いて動作を確認することで理解が深まります。

Q2: 初心者がよく陥る間違いは何ですか?

基礎を飛ばして応用に進むことです。このガイドで説明している基本概念をしっかり理解してから、次のステップに進むことをお勧めします。

Q3: 実務ではどのように活用されていますか?

このトピックの知識は、日常的な開発業務で頻繁に活用されます。特にコードレビューやアーキテクチャ設計の際に重要になります。


11. まとめ

安全対策 実装方法 目的 優先度
ツール権限制御 ホワイトリスト + ポリシー 不正操作の防止 必須
入力ガードレール パターン + ML 検出 インジェクション防止 必須
出力フィルタリング PII検出 + 有害性チェック 情報漏洩・有害出力防止 必須
人間承認ゲート 非同期ワークフロー 高リスク操作の制御 推奨
サンドボックス Docker + リソース制限 実行環境の隔離 推奨
ループ検出 呼び出しパターン監視 リソース浪費の防止 推奨
監査ログ 全操作の記録 + ハッシュチェーン 追跡可能性の確保 必須
コスト制限 多層的な予算制御 コスト爆発の防止 推奨
コンプライアンス 規制ルール自動チェック 法規制への準拠 業界依存
動的権限 セッション中の権限調整 きめ細かい制御 高度

次に読むべきガイド


参考文献

  1. Wunderwuzzi. (2024). "Prompt Injection and AI Agents: Threats, Defenses, and Real-world Scenarios." Embracethered. https://embracethered.com/blog/
  2. Yao, S. et al. (2023). "ReAct: Synergizing Reasoning and Acting in Language Models." ICLR 2023. https://arxiv.org/abs/2210.03629
  3. OWASP. (2025). "OWASP Top 10 for Large Language Model Applications." OWASP Foundation. https://owasp.org/www-project-top-10-for-large-language-model-applications/
  4. Greshake, K. et al. (2023). "Not what you've signed up for: Compromising Real-World LLM-Integrated Applications with Indirect Prompt Injection." arXiv. https://arxiv.org/abs/2302.12173
  5. NIST. (2024). "AI Risk Management Framework." NIST. https://www.nist.gov/artificial-intelligence