Agent Harness Engineering 实战指南

Agent Harness Engineering 实战指南

概述

2026 年,一个全新的技术概念——Agent Harness Engineering(代理工程化框架)——迅速成为业界最热门的话题。Deloitte 在《Tech Trends 2026》报告中指出,尽管 AI Agent 技术已经成熟,但只有 11% 的组织成功将 Agent 部署到生产环境。这个巨大的落差催生了 Agent Harness Engineering:一门关于如何为 AI Agent 构建可靠、可观测、可评估的生产基础设施的工程学科。

如果说 AI Agent 是「大脑」,那么 Harness 就是「身体」——它提供运行环境、安全边界、监控系统、评估框架和回滚机制。本文系统讲解 Agent Harness 的核心概念、架构设计和实战落地方法。

前置要求

  • 了解 AI Agent 的基本概念(感知→思考→行动循环)
  • 熟悉 Python 编程
  • 了解 Docker 和微服务基础概念
  • 了解基本的 LLM API 调用

一、为什么 2026 年需要 Agent Harness?

1.1 Agent 生产化的三大挑战

挑战 说明 后果
不可预测性 LLM 的输出不是确定性的,同样的输入可能产生不同的行为 生产环境行为难以保证
工具安全 Agent 可以调用 Shell、数据库、API,权限失控风险高 数据泄露、系统破坏
评估困难 传统单元测试无法覆盖 Agent 的多步决策路径 质量无法量化

1.2 Harness 的核心职责

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
┌─────────────────────────────────────────────────────┐
│ Agent Harness │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 沙箱 │ │ 监控 │ │ 评估 │ │
│ │ (Sandbox)│ │ (Monitor)│ │ (Eval) │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 追踪 │ │ 缓存 │ │ 限流 │ │
│ │ (Tracing)│ │ (Cache) │ │ (Rate │ │
│ └──────────┘ └──────────┘ │ Limit) │ │
│ └──────────┘ │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 回滚 │ │ 审计 │ │ A/B │ │
│ │ (Rollback)│ │ (Audit) │ │ 测试 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────┘
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ LLM API │ │ 工具集 │
│ (多Provider)│ │ (沙箱执行)│
└──────────┘ └──────────┘

二、Harness 核心组件实现

2.1 沙箱执行环境

Agent 最危险的能力是执行代码和命令。沙箱是 Harness 的第一道防线。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# harness/sandbox.py
import os
import tempfile
import subprocess
import resource
from pathlib import Path
from typing import Optional

class AgentSandbox:
"""安全的 Agent 执行沙箱"""

def __init__(self, work_dir: Optional[str] = None):
self.work_dir = Path(work_dir or tempfile.mkdtemp(prefix="agent_sandbox_"))
self.allowed_commands = {
"ls", "cat", "head", "tail", "wc", "date",
"pwd", "echo", "grep", "sort", "uniq", "cut",
}
self.allowed_paths = {str(self.work_dir)}
self.max_output_size = 1024 * 100 # 100KB
self.max_execution_time = 30 # 30秒

def run_command(self, command: str) -> dict:
"""在沙箱中执行命令"""
parts = command.strip().split()
if not parts:
return {"success": False, "error": "空命令"}

cmd = parts[0]
if cmd not in self.allowed_commands:
return {"success": False, "error": f"命令 '{cmd}' 不在白名单中"}

# 路径安全检查
for part in parts[1:]:
if part.startswith("/") and not any(
part.startswith(p) for p in self.allowed_paths
):
return {"success": False, "error": f"路径 '{part}' 不在允许范围内"}

try:
# 设置资源限制
def set_limits():
resource.setrlimit(resource.RLIMIT_CPU, (self.max_execution_time, self.max_execution_time))
resource.setrlimit(resource.RLIMIT_FSIZE, (self.max_output_size, self.max_output_size))

result = subprocess.run(
parts,
capture_output=True,
text=True,
timeout=self.max_execution_time,
cwd=self.work_dir,
env={**os.environ, "PATH": "/usr/local/bin:/usr/bin:/bin"},
preexec_fn=set_limits,
)

output = result.stdout[-self.max_output_size:] if len(result.stdout) > self.max_output_size else result.stdout
error = result.stderr[-self.max_output_size:] if len(result.stderr) > self.max_output_size else result.stderr

return {
"success": result.returncode == 0,
"output": output,
"error": error,
"returncode": result.returncode,
}

except subprocess.TimeoutExpired:
return {"success": False, "error": "命令执行超时"}
except Exception as e:
return {"success": False, "error": str(e)}

def read_file(self, path: str) -> dict:
"""安全地读取文件"""
full_path = (self.work_dir / path).resolve()
if not str(full_path).startswith(str(self.work_dir.resolve())):
return {"success": False, "error": "路径越权"}
if not full_path.exists():
return {"success": False, "error": "文件不存在"}
try:
content = full_path.read_text()
return {"success": True, "content": content}
except Exception as e:
return {"success": False, "error": str(e)}

def write_file(self, path: str, content: str) -> dict:
"""安全地写入文件"""
full_path = (self.work_dir / path).resolve()
if not str(full_path).startswith(str(self.work_dir.resolve())):
return {"success": False, "error": "路径越权"}
try:
full_path.parent.mkdir(parents=True, exist_ok=True)
full_path.write_text(content)
return {"success": True, "path": str(full_path)}
except Exception as e:
return {"success": False, "error": str(e)}

def cleanup(self):
"""清理沙箱"""
import shutil
shutil.rmtree(self.work_dir, ignore_errors=True)

2.2 追踪与可观测性

Agent 的多步决策过程必须完全可追溯。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# harness/tracing.py
import json
import uuid
import time
from datetime import datetime
from typing import Optional, Any

class AgentTracer:
"""Agent 执行追踪器"""

def __init__(self, storage_path: str = "./traces"):
self.storage_path = storage_path
self.current_trace: Optional[dict] = None

def start_trace(self, session_id: str, user_input: str) -> str:
"""开始一个新的追踪"""
trace_id = str(uuid.uuid4())
self.current_trace = {
"trace_id": trace_id,
"session_id": session_id,
"user_input": user_input,
"started_at": datetime.utcnow().isoformat(),
"steps": [],
"total_tokens": 0,
"total_cost": 0.0,
"status": "running",
}
return trace_id

def add_step(self, step_type: str, details: dict):
"""记录一个执行步骤"""
if not self.current_trace:
return

step = {
"step_id": len(self.current_trace["steps"]) + 1,
"type": step_type, # thought | tool_call | tool_result | final_answer
"timestamp": datetime.utcnow().isoformat(),
"duration_ms": 0,
**details,
}
self.current_trace["steps"].append(step)

def end_trace(self, status: str = "completed"):
"""结束追踪"""
if not self.current_trace:
return

self.current_trace["status"] = status
self.current_trace["ended_at"] = datetime.utcnow().isoformat()
self.current_trace["total_steps"] = len(self.current_trace["steps"])

# 保存到文件
import os
os.makedirs(self.storage_path, exist_ok=True)
filepath = f"{self.storage_path}/{self.current_trace['trace_id']}.json"
with open(filepath, "w") as f:
json.dump(self.current_trace, f, indent=2, ensure_ascii=False)

trace = self.current_trace
self.current_trace = None
return trace

def to_llm_log(self) -> list[dict]:
"""将追踪转换为 LLM 可读的日志格式"""
if not self.current_trace:
return []

messages = [{"role": "user", "content": self.current_trace["user_input"]}]
for step in self.current_trace["steps"]:
if step["type"] == "thought":
messages.append({"role": "assistant", "content": step.get("content", "")})
elif step["type"] == "tool_call":
messages.append({
"role": "assistant",
"content": f"[调用工具] {step.get('tool_name')}: {step.get('arguments', {})}"
})
elif step["type"] == "tool_result":
messages.append({
"role": "user",
"content": f"[工具结果] {step.get('result', '')}"
})
return messages

2.3 评估框架(Eval Harness)

Agent 评估比传统软件测试复杂得多,需要多维度量化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# harness/eval.py
import json
from typing import Callable
from dataclasses import dataclass, field

@dataclass
class EvalCase:
"""评估用例"""
name: str
input: str
expected_behaviors: list[str] # 期望的行为描述
expected_tools: list[str] = field(default_factory=list) # 期望调用的工具
max_steps: int = 10
tags: list[str] = field(default_factory=list)

@dataclass
class EvalResult:
"""评估结果"""
case_name: str
passed: bool
score: float # 0.0 ~ 1.0
details: dict = field(default_factory=dict)
trace: list = field(default_factory=list)

class EvalHarness:
"""Agent 评估框架"""

def __init__(self):
self.cases: list[EvalCase] = []
self.metrics: dict[str, Callable] = {}

def add_case(self, case: EvalCase):
self.cases.append(case)

def register_metric(self, name: str, fn: Callable):
"""注册自定义评估指标"""
self.metrics[name] = fn

async def evaluate(self, agent_fn: Callable) -> list[EvalResult]:
"""运行评估"""
results = []

for case in self.cases:
print(f"评估: {case.name}")

# 运行 Agent
trace = await agent_fn(case.input, max_steps=case.max_steps)

# 计算分数
score = self._compute_score(case, trace)
passed = score >= 0.7 # 阈值可配置

results.append(EvalResult(
case_name=case.name,
passed=passed,
score=score,
details={
"steps_used": len(trace.get("steps", [])),
"tools_called": self._extract_tools(trace),
"expected_tools_hit": self._check_tools(case, trace),
},
trace=trace.get("steps", []),
))

return results

def _compute_score(self, case: EvalCase, trace: dict) -> float:
"""综合评分"""
scores = []

# 1. 工具调用准确率(0~0.4)
tool_score = self._tool_accuracy(case, trace)
scores.append(("工具准确率", tool_score, 0.4))

# 2. 步骤效率(0~0.2)
efficiency = max(0, 1 - len(trace.get("steps", [])) / (case.max_steps * 2))
scores.append(("步骤效率", efficiency, 0.2))

# 3. 自定义指标(0~0.4)
for name, fn in self.metrics.items():
try:
metric_score = fn(case, trace)
scores.append((name, metric_score, 0.4 / max(len(self.metrics), 1)))
except Exception:
pass

total = sum(score * weight for _, score, weight in scores)
return min(1.0, max(0.0, total))

def _tool_accuracy(self, case: EvalCase, trace: dict) -> float:
"""工具调用准确率"""
if not case.expected_tools:
return 1.0 # 没有期望工具则满分

called_tools = self._extract_tools(trace)
if not called_tools:
return 0.0

hits = sum(1 for t in case.expected_tools if t in called_tools)
return hits / len(case.expected_tools)

def _extract_tools(self, trace: dict) -> set:
tools = set()
for step in trace.get("steps", []):
if step.get("type") == "tool_call":
tools.add(step.get("tool_name"))
return tools

def _check_tools(self, case: EvalCase, trace: dict) -> dict:
called = self._extract_tools(trace)
return {
"expected": case.expected_tools,
"called": list(called),
"missed": [t for t in case.expected_tools if t not in called],
}

def report(self, results: list[EvalResult]) -> str:
"""生成评估报告"""
total = len(results)
passed = sum(1 for r in results if r.passed)
avg_score = sum(r.score for r in results) / total if total > 0 else 0

lines = [
"=" * 60,
"Agent Eval 报告",
"=" * 60,
f"总用例: {total}",
f"通过: {passed}/{total} ({passed/total*100:.1f}%)",
f"平均分: {avg_score:.3f}",
"",
"详细结果:",
"-" * 60,
]

for r in results:
icon = "✅" if r.passed else "❌"
lines.append(f"{icon} {r.case_name} (score: {r.score:.3f})")
lines.append(f" 步骤: {r.details['steps_used']} | "
f"工具: {r.details['tools_called']}")

return "\n".join(lines)

2.4 缓存与限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# harness/cache.py
import hashlib
import json
import time
from functools import wraps
from collections import defaultdict

class SemanticCache:
"""LLM 调用语义缓存"""

def __init__(self, ttl_seconds: int = 3600):
self.cache: dict[str, dict] = {}
self.ttl = ttl_seconds

def _make_key(self, messages: list[dict]) -> str:
"""生成缓存键(基于最后一条用户消息的哈希)"""
# 实际项目中可用 embedding 做语义匹配
content = json.dumps(messages[-1] if messages else "", sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()

def get(self, messages: list[dict]) -> str | None:
key = self._make_key(messages)
entry = self.cache.get(key)
if entry and time.time() - entry["timestamp"] < self.ttl:
return entry["response"]
return None

def set(self, messages: list[dict], response: str):
key = self._make_key(messages)
self.cache[key] = {"response": response, "timestamp": time.time()}

def clear(self):
self.cache.clear()


class RateLimiter:
"""令牌桶限流器"""

def __init__(self, rpm: int = 60):
self.rpm = rpm
self.tokens = rpm
self.last_refill = time.time()

def _refill(self):
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.rpm, self.tokens + elapsed * (self.rpm / 60))
self.last_refill = now

def acquire(self) -> bool:
self._refill()
if self.tokens >= 1:
self.tokens -= 1
return True
return False

def wait_and_acquire(self, timeout: float = 30) -> bool:
start = time.time()
while time.time() - start < timeout:
if self.acquire():
return True
time.sleep(0.1)
return False

三、完整 Harness 集成

3.1 生产级 Agent 运行器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# harness/runner.py
import asyncio
from typing import Optional

class AgentRunner:
"""生产级 Agent 运行器"""

def __init__(
self,
sandbox: Optional[AgentSandbox] = None,
tracer: Optional[AgentTracer] = None,
cache: Optional[SemanticCache] = None,
rate_limiter: Optional[RateLimiter] = None,
max_steps: int = 20,
):
self.sandbox = sandbox or AgentSandbox()
self.tracer = tracer or AgentTracer()
self.cache = cache or SemanticCache()
self.rate_limiter = rate_limiter or RateLimiter()
self.max_steps = max_steps

async def run(self, user_input: str, session_id: str = "default") -> dict:
"""运行 Agent(带完整 Harness)"""

# 1. 限流检查
if not self.rate_limiter.acquire():
return {
"success": False,
"error": "请求过于频繁,请稍后再试",
"status_code": 429,
}

# 2. 缓存检查
cached = self.cache.get([{"role": "user", "content": user_input}])
if cached:
return {"success": True, "response": cached, "from_cache": True}

# 3. 开始追踪
trace_id = self.tracer.start_trace(session_id, user_input)

try:
# 4. 运行 Agent 循环(这里接入你的 Agent 实现)
response = await self._run_agent_loop(user_input)

# 5. 写入缓存
self.cache.set([{"role": "user", "content": user_input}], response)

# 6. 结束追踪
self.tracer.end_trace("completed")

return {"success": True, "response": response, "trace_id": trace_id}

except Exception as e:
self.tracer.end_trace("failed")
return {"success": False, "error": str(e), "trace_id": trace_id}

finally:
# 7. 清理沙箱
self.sandbox.cleanup()

async def _run_agent_loop(self, user_input: str) -> str:
"""Agent 主循环(示意)"""
# 实际项目中接入你的 Agent 实现
# 这里展示追踪的用法
self.tracer.add_step("thought", {"content": f"分析用户输入: {user_input[:50]}..."})

# 模拟工具调用
self.tracer.add_step("tool_call", {
"tool_name": "search_knowledge",
"arguments": {"query": user_input},
})
self.tracer.add_step("tool_result", {
"result": "找到 3 条相关结果",
})

return f"已处理: {user_input}"

3.2 配置管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# harness/config.py
from pydantic import BaseSettings, Field

class HarnessConfig(BaseSettings):
"""Harness 全局配置"""

# 沙箱配置
sandbox_work_dir: str = "/tmp/agent_sandbox"
sandbox_max_output: int = 102_400
sandbox_timeout: int = 30

# 限流配置
rate_limit_rpm: int = 60
rate_limit_concurrent: int = 10

# 缓存配置
cache_ttl: int = 3600
cache_max_size: int = 1000

# 追踪配置
trace_storage: str = "./traces"
trace_retention_days: int = 30

# 评估配置
eval_threshold: float = 0.7
eval_max_steps: int = 20

# LLM 配置
llm_provider: str = "anthropic"
llm_model: str = "claude-sonnet-4"
llm_max_retries: int = 3
llm_timeout: int = 60

# 安全配置
allowed_commands: list[str] = [
"ls", "cat", "head", "tail", "wc", "date",
"pwd", "echo", "grep", "sort", "uniq",
]
allowed_extensions: list[str] = [".txt", ".md", ".json", ".csv", ".yaml", ".toml"]

class Config:
env_prefix = "HARNESS_"

四、生产部署清单

4.1 部署前检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
## Agent Harness 部署检查清单

### 安全
- [ ] 沙箱目录已隔离(不可访问系统关键路径)
- [ ] 命令白名单已配置
- [ ] 文件读写限制在沙箱目录内
- [ ] 网络访问已限制(白名单域名)
- [ ] LLM API Key 使用环境变量/密钥管理服务
- [ ] 敏感信息脱敏(日志中过滤 API Key、密码)

### 可观测性
- [ ] 所有 Agent 步骤已追踪
- [ ] 关键指标已接入监控(成功率、延迟、Token 消耗)
- [ ] 告警规则已配置(错误率 > 5%、延迟 > 30s)
- [ ] 日志已接入集中式日志系统

### 评估
- [ ] 核心场景的 Eval Case 已编写
- [ ] 回归测试已配置(每次模型更新后运行)
- [ ] 评分阈值已设定
- [ ] A/B 测试框架已就绪

### 运维
- [ ] 限流策略已配置
- [ ] 缓存策略已配置
- [ ] 降级策略已配置(Agent 不可用时回退到直接 LLM 调用)
- [ ] 预算上限已设定(每月 Token 消耗上限)
- [ ] 回滚机制已就绪

4.2 Docker Compose 部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# docker-compose.yml
version: "3.8"

services:
agent-api:
build: .
ports:
- "8080:8080"
environment:
- HARNESS_LLM_API_KEY=${LLM_API_KEY}
- HARNESS_RATE_LIMIT_RPM=60
- HARNESS_TRACE_STORAGE=/data/traces
volumes:
- agent_data:/data
- /tmp/agent_sandbox:/tmp/agent_sandbox
deploy:
replicas: 3
resources:
limits:
memory: 512M
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3

redis:
image: redis:7-alpine
volumes:
- redis_data:/data
ports:
- "6379:6379"

prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"

volumes:
agent_data:
redis_data:

五、常见问题

Q: Agent Harness Engineering 和传统的 MLOps 有什么区别?

A: MLOps 关注模型的生命周期管理(训练、部署、监控),而 Agent Harness 关注 Agent 的运行基础设施(沙箱、追踪、评估、安全)。Agent 比模型多了一个「行动层」——它会调用工具、执行代码、操作外部系统——这带来了全新的安全性和可观测性挑战。

Q: 小型团队需要完整的 Harness 吗?

A: 不需要一步到位。建议按优先级逐步建设:沙箱(第一天)→ 追踪(第一周)→ 评估(第一个月)→ 缓存/限流(按需)。最小可行 Harness 只需要沙箱 + 基本追踪。

Q: 如何评估 Agent 的输出质量?

A: 多维度评估:1)任务完成率(是否达成目标);2)工具调用准确率(是否调用了正确的工具);3)步骤效率(是否用最少的步骤完成任务);4)安全性(是否尝试了越权操作)。建议为每个核心场景编写 10-20 个 Eval Case。

Q: Agent 回滚怎么做?

A: 两种策略:1)模型版本回滚——保留前一个版本的 LLM 模型;2)行为版本回滚——保留 Agent 的系统提示词和工具配置的历史版本。推荐同时使用,因为 Agent 的行为由「模型 + 提示词 + 工具」三者共同决定。

Q: Harness 会增加多少延迟?

A: 沙箱和追踪的开销通常在 50-200ms 以内(主要取决于沙箱初始化和序列化)。缓存可以显著降低延迟(命中时减少 50-80%)。限流本身几乎无开销。总体而言,Harness 的开销远小于 LLM 调用本身的延迟(通常 2-10s)。


六、总结

组件 优先级 复杂度 关键收益
沙箱 P0 ⭐⭐ 安全隔离,防止 Agent 越权
追踪 P0 可观测性,问题排查
评估 P1 ⭐⭐⭐ 质量量化,回归保障
缓存 P1 ⭐⭐ 降低成本,减少延迟
限流 P1 保护后端,防止滥用
A/B 测试 P2 ⭐⭐⭐⭐ 渐进式上线,风险控制

一句话总结: Agent Harness Engineering 是 2026 年将 AI Agent 从「能跑」推向「可靠」的关键工程学科。沙箱保安全、追踪保可观测、评估保质量——三者缺一不可。