AgentsException
基类
MaxTurnsExceeded
超轮次
ModelBehaviorError
模型异常
ToolTimeoutError
工具超时
InputGuardrailTripwireTriggered
输入护栏
OutputGuardrailTripwireTriggered
输出护栏
从「调用 Runner.run() 之后发生了什么」切入,系统拆解 Agent Loop 的 while-true 执行机制、三种运行方式(run / run_sync / run_streamed)的选择逻辑、RunConfig 的四类配置用途,并通过完整代码示例逐一讲解 4 种对话管理策略和 6 种异常类型的处理方式。结尾附三条可立即落地的实践建议。
リサーチノート
Runner.run(agent, "帮我写首诗"),然后呢?从这行代码到最终输出,中间发生了什么?很多人用了很久 SDK 还是说不清。这篇把它拆开来看。final_output(纯文本输出且无工具调用)→ 退出循环,返回结果max_turns 上限 → 抛出 MaxTurnsExceededmax_turns=10 的 Agent,可能实际跑了 10 次 LLM + N 次工具。max_turns、执行 _run_single_turn,根据 NextStepFinalOutput 或 NextStepAction 决定下一步,超限触发 MaxTurnsExceeded。简洁强大。| 方法 | 返回 | 适用场景 |
|---|---|---|
Runner.run() | RunResult(异步) | 生产环境主力,FastAPI / asyncio 环境 |
Runner.run_sync() | RunResult(同步) | 脚本、命令行工具、快速验证 |
Runner.run_streamed() | RunResultStreaming(异步) | 需要逐字输出的聊天界面 |
from agents import Agent, Runner
agent = Agent(name="Assistant", instructions="You are a helpful assistant")
result = await Runner.run(agent, "Write a haiku about recursion in programming.")
print(result.final_output)run_sync 是对 run 的同步封装,内部直接调 .run()。官方文档明说了:如果当前环境已经有事件循环(Jupyter notebook、FastAPI、已运行的 asyncio context),直接用 run_sync 会报错,必须换成 await Runner.run()4。这个坑踩过的人不少。run_sync 入门,复杂场景(并行多 Agent、多工具、流式输出)推荐异步,以避免阻塞。
run_config,它能覆盖所有 Agent 的配置,优先级高于单个 Agent 的设置1。model / model_provider / model_settings — 全局切换模型或参数,比如让所有 Agent 都用 temperature=0 跑确定性测试input_guardrails / output_guardrails / handoff_input_filter / nest_handoff_history — nest_handoff_history 是 opt-in beta,开启后会在 handoff 前把对话历史折叠成一条 assistant 消息,减少下游 Agent 的 token 消耗tracing_disabled / workflow_name / trace_id / group_id / trace_metadata — 建议至少设置 workflow_name,方便在 OpenAI 控制台里区分不同业务流程call_model_input_filter — 在每次 LLM 调用前的最后一刻修改输入,典型用途是截断过长历史1:from agents import Agent, Runner, RunConfig
from agents.run import CallModelData, ModelInputData
def drop_old_messages(data: CallModelData[None]) -> ModelInputData:
# 只保留最近 5 条消息
trimmed = data.model_data.input[-5:]
return ModelInputData(input=trimmed, instructions=data.model_data.instructions)
result = Runner.run_sync(
agent,
"Explain quines",
run_config=RunConfig(call_model_input_filter=drop_old_messages),
)reasoning_item_id_policy。设为 "omit" 可以解决多轮对话中 reasoning item 的 Responses API 400 报错(错误信息类似 Item 'rs_...' of type 'reasoning' was provided without its required following item)4。这类错误平时很难排查,改一个参数就能规避。result.to_input_list() — 手动拼接agent = Agent(name="Assistant", instructions="Reply very concisely.")
result = await Runner.run(agent, "What city is the Golden Gate Bridge in?")
print(result.final_output) # San Francisco
# 手动把上轮输出 + 新问题拼在一起
new_input = result.to_input_list() + [{"role": "user", "content": "What state is it in?"}]
result = await Runner.run(agent, new_input)
print(result.final_output) # Californiasession — 自动历史管理from agents import Agent, Runner, SQLiteSession
agent = Agent(name="Assistant", instructions="Reply very concisely.")
session = SQLiteSession("conversation_123")
# 第一轮
result = await Runner.run(agent, "What city is the Golden Gate Bridge in?", session=session)
# 第二轮 — Agent 自动记住上下文
result = await Runner.run(agent, "What state is it in?", session=session)conversation_id — 服务端命名对话from openai import AsyncOpenAI
client = AsyncOpenAI()
conversation = await client.conversations.create()
conv_id = conversation.id
while True:
user_input = input("You: ")
result = await Runner.run(agent, user_input, conversation_id=conv_id)
print(f"Assistant: {result.final_output}")previous_response_id — 轻量链式对话agent = Agent(name="Assistant", instructions="Reply very concisely.")
previous_response_id = None
while True:
user_input = input("You: ")
result = await Runner.run(
agent,
user_input,
previous_response_id=previous_response_id,
auto_previous_response_id=True,
)
previous_response_id = result.last_response_id
print(f"Assistant: {result.final_output}")conversation_id 和 previous_response_id 互斥1。用前者你拥有一个可命名、可跨系统共享的对话资源;用后者只是最简的「上一轮 → 这一轮」链接。session(客户端管理)和这两个服务端方案也不能在同一次 run 里混用。run_streamed 的双层事件RawResponsesStreamEvent — LLM 原始输出,逐 token 流,event.type == "raw_response_event",适合需要「第一个字尽快出现」的场景:from openai.types.responses import ResponseTextDeltaEvent
result = Runner.run_streamed(agent, input="Tell me 5 jokes.")
async for event in result.stream_events():
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
print(event.data.delta, end="", flush=True)from agents import Agent, ItemHelpers, Runner, function_tool
async for event in result.stream_events():
if event.type == "raw_response_event":
continue # 忽略低层 token 事件
elif event.type == "agent_updated_stream_event":
print(f"切换到 Agent: {event.new_agent.name}")
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
print("-- 工具被调用")
elif event.item.type == "tool_call_output_item":
print(f"-- 工具输出: {event.item.output}")
elif event.item.type == "message_output_item":
print(f"-- 消息: {ItemHelpers.text_message_output(event.item)}")stream_events() 的 async iterator 跑完后,SDK 可能还在做 session 持久化、审批状态写入、历史压缩等后处理。只有 result.is_complete 为 True 时才算真正结束。event.type == "raw_response_event" 和 event.data.delta 实现实时逐字显示,是 Web 聊天界面「打字机效果」最简单的做法。MaxTurnsExceeded 支持自定义 error handler,返回一个优雅的 fallback 而不是直接抛错1:from agents import Agent, RunErrorHandlerInput, RunErrorHandlerResult, Runner
def on_max_turns(_data: RunErrorHandlerInput[None]) -> RunErrorHandlerResult:
return RunErrorHandlerResult(
final_output="任务太复杂了,请把请求拆得更细一些。",
include_in_history=False, # 不把这条 fallback 加入对话历史
)
result = Runner.run_sync(
agent,
"分析这份超长报告",
max_turns=3,
error_handlers={"max_turns": on_max_turns},
)
print(result.final_output)UserError 是你写错了代码——比如 call_model_input_filter 返回了非 ModelInputData 类型,或者配置了互斥的参数组合。看到这个异常先查代码,不是 SDK bug。run_sync 验证逻辑,再改成异步RunConfig 是统一管多 Agent 行为的最好入口model_settings,不如在 RunConfig 里统一设置一次。call_model_input_filter 截断历史也在这里做——比在每个 Agent 里加逻辑简洁得多11。to_input_list 最灵活但你要自己管;SQLiteSession 适合单机持久化;conversation_id 适合分布式多服务共享;previous_response_id 轻量但有 30 天有效期6。这四种策略改起来会牵动整个对话逻辑——早决定,省事。@function_tool 怎么工作、Pydantic 验证、工具超时处理,以及 built-in tools(网页搜索、文件搜索、计算机操作)的接入方式。
このコンテンツについて、さらに観点や背景を補足しましょう。