第 5 章 · 查询引擎
如果说工具系统是智能体的"双手",命令 系统是用户的"遥控器",那么查询引擎就是整个系统的"大脑"——它负责与 LLM 对话、解析流式响应、编排工具调用、管理上下文窗口、追踪成本消耗。本章将带你深入理解查询引擎的完整实现:从
QueryEngine类的会话管理,到query.ts中的核心循环,再到上下文收集和成本追踪系统。
5.1 概述:查询引擎的角色
查询引擎是连接用户输入与 LLM 响应的核心管道。每当用户发送一条消息(无论是自然语言还是 prompt 类型的斜杠命令),最终都会流经查询引擎。它的职责包括:
- 会话管理:维护对话历史、文件状态缓存和 Token 使用量
- 流式响应处理:逐块接收 LLM 返回的数据流,实时渲染给用户
- 工具调用循环:当 LLM 决定调用工具时,执行工具并将结果回传
- 上下文优化:自动压缩、微压缩、上下文折叠等多种策略管理上下文窗口
- 重试与容错:处理 API 错误、速率限制、模型回退等异常场景
- 成本追踪:记录每次 API 调用的 Token 消耗和费用
查询引擎的核心文件分布如下:
| 文件 | 职责 |
|---|---|
src/QueryEngine.ts | 查询引擎类,管理会话生命周期和消息流 |
src/query.ts | 核心查询循环,编排 API 调用、工具执行和上下文管理 |
src/query/config.ts | 查询配置快照,不可变的运行时参数 |
src/query/deps.ts | 依赖注入接口,便于测试 |
src/query/stopHooks.ts | 停止钩子处理,轮次结束后的后处理逻辑 |
src/query/tokenBudget.ts | Token 预算追踪,控制自动续写 |
src/context.ts | 上下文收集,Git 状态和用户配置 |
src/cost-tracker.ts | 成本追踪系统,记录费用和 Token 使用量 |
5.2 QueryEngine 类:会话的守护者
设计理念
QueryEngine 是整个查询生命周期的入口。它的核心设计理念是:一个 QueryEngine 实例对应一个对话会话。每次调用 submitMessage() 代表会话中的一个新轮次,而状态(消息历史、文件缓存、Token 使用量等)在轮次之间持久化。
/**
* QueryEngine owns the query lifecycle and session state for a conversation.
* It extracts the core logic from ask() into a standalone class that can be
* used by both the headless/SDK path and (in a future phase) the REPL.
*
* One QueryEngine per conversation. Each submitMessage() call starts a new
* turn within the same conversation. State (messages, file cache, usage, etc.)
* persists across turns.
*/
export class QueryEngine {
private config: QueryEngineConfig
private mutableMessages: Message[]
private abortController: AbortController
private permissionDenials: SDKPermissionDenial[]
private totalUsage: NonNullableUsage
private hasHandledOrphanedPermission = false
private readFileState: FileStateCache
private discoveredSkillNames = new Set<string>()
private loadedNestedMemoryPaths = new Set<string>()
constructor(config: QueryEngineConfig) {
this.config = config
this.mutableMessages = config.initialMessages ?? []
this.abortController = config.abortController ?? createAbortController()
this.permissionDenials = []
this.readFileState = config.readFileCache
this.totalUsage = EMPTY_USAGE
}
// ...
}
QueryEngineConfig:配置全景
QueryEngineConfig 定义了创建查询引擎所需的全部配置。它涵盖了从基础设施到业务逻辑的各个层面:
export type QueryEngineConfig = {
cwd: string
tools: Tools
commands: Command[]
mcpClients: MCPServerConnection[]
agents: AgentDefinition[]
canUseTool: CanUseToolFn
getAppState: () => AppState
setAppState: (f: (prev: AppState) => AppState) => void
initialMessages?: Message[]
readFileCache: FileStateCache
customSystemPrompt?: string
appendSystemPrompt?: string
userSpecifiedModel?: string
fallbackModel?: string
thinkingConfig?: ThinkingConfig
maxTurns?: number
maxBudgetUsd?: number
taskBudget?: { total: number }
jsonSchema?: Record<string, unknown>
verbose?: boolean
replayUserMessages?: boolean
handleElicitation?: ToolUseContext['handleElicitation']
includePartialMessages?: boolean
setSDKStatus?: (status: SDKStatus) => void
abortController?: AbortController
orphanedPermission?: OrphanedPermission
snipReplay?: (
yieldedSystemMsg: Message,
store: Message[],
) => { messages: Message[]; executed: boolean } | undefined
}
几个关键配置项值得注意:
| 配置项 | 用途 |
|---|---|
maxTurns | 限制最大轮次数,防止无限循环 |
maxBudgetUsd | 美元预算上限,超出后终止查询 |
taskBudget | API 级别的任务预算(output_config.task_budget) |
thinkingConfig | 思考模式配置:adaptive / enabled / disabled |
fallbackModel | 主模型不可用时的回退模型 |
snipReplay | 历史裁剪回放处理器(HISTORY_SNIP 特性) |
submitMessage:轮次的完整生命周期
submitMessage 是 QueryEngine 的核心方法。它是一个 AsyncGenerator——通过 yield 逐步产出 SDK 消息 ,调用方可以实时消费这些消息。
一个完整的 submitMessage 调用经历以下阶段:
阶段 1:构建系统提示词
// 获取系统提示词的各个组成部分
const {
defaultSystemPrompt,
userContext: baseUserContext,
systemContext,
} = await fetchSystemPromptParts({
tools,
mainLoopModel: initialMainLoopModel,
additionalWorkingDirectories: Array.from(
initialAppState.toolPermissionContext.additionalWorkingDirectories.keys(),
),
mcpClients,
customSystemPrompt: customPrompt,
})
// 组装最终系统提示词:默认提示词 + 记忆机制提示词 + 追加提示词
const systemPrompt = asSystemPrompt([
...(customPrompt !== undefined ? [customPrompt] : defaultSystemPrompt),
...(memoryMechanicsPrompt ? [memoryMechanicsPrompt] : []),
...(appendSystemPrompt ? [appendSystemPrompt] : []),
])
系统提示词由三部分组成:
- 默认提示词(或自定义提示词):定义 LLM 的角色和行为规范
- 记忆机制提示词:当 SDK 调用方配置了记忆目录时注入,教 LLM 如何使用 MEMORY.md
- 追加提示词:调用方可以附加额外的策略文本
阶段 2:处理用户输入
const {
messages: messagesFromUserInput,
shouldQuery,
allowedTools,
model: modelFromUserInput,
resultText,
} = await processUserInput({
input: prompt,
mode: 'prompt',
setToolJSX: () => {},
context: {
...processUserInputContext,
messages: this.mutableMessages,
},
messages: this.mutableMessages,
uuid: options?.uuid,
isMeta: options?.isMeta,
querySource: 'sdk',
})
processUserInput 是用户输入的总入口(在第 4 章中已介绍)。它返回的 shouldQuery 决定了后续走向:
true:需要调用 LLM(自然语言或 prompt 命令)false:本地命令已处理完毕,直接返回结果
阶段 3-5:进入查询循环
当 shouldQuery 为 true 时,进入核心查询循环。这是整个查询引擎最复杂的部分,我们将在 5.3 节详细展开。
阶段 6:预算检查
查询循环的每次迭代后,QueryEngine 会检查多种预算限制:
// 检查美元预算是否超限
if (maxBudgetUsd !== undefined && getTotalCost() >= maxBudgetUsd) {
yield {
type: 'result',
subtype: 'error_max_budget_usd',
// ...
errors: [`Reached maximum budget (${maxBudgetUsd})`],
}
return
}
// 检查结构化输出重试次数是否超限
if (message.type === 'user' && jsonSchema) {
const currentCalls = countToolCalls(
this.mutableMessages, SYNTHETIC_OUTPUT_TOOL_NAME,
)
const callsThisQuery = currentCalls - initialStructuredOutputCalls
const maxRetries = parseInt(
process.env.MAX_STRUCTURED_OUTPUT_RETRIES || '5', 10,
)
if (callsThisQuery >= maxRetries) {
yield { type: 'result', subtype: 'error_max_structured_output_retries', /* ... */ }
return
}
}
ask():一次性查询的便捷包装
对于不需要多轮对话的场景,ask() 函数提供了一个便捷的包装器:
// 便捷包装器:创建 QueryEngine,提交一条消息,返回结果
export async function* ask({
commands, prompt, cwd, tools, mcpClients, canUseTool,
mutableMessages = [], getReadFileCache, setReadFileCache,
// ... 更多参数
}: { /* ... */ }): AsyncGenerator<SDKMessage, void, unknown> {
const engine = new QueryEngine({
cwd, tools, commands, mcpClients, agents, canUseTool,
getAppState, setAppState,
initialMessages: mutableMessages,
readFileCache: cloneFileStateCache(getReadFileCache()),
// ... 更多配置
// HISTORY_SNIP 特性:注入裁剪回放处理器
...(feature('HISTORY_SNIP')
? {
snipReplay: (yielded: Message, store: Message[]) => {
if (!snipProjection!.isSnipBoundaryMessage(yielded))
return undefined
return snipModule!.snipCompactIfNeeded(store, { force: true })
},
}
: {}),
})
try {
yield* engine.submitMessage(prompt, { uuid: promptUuid, isMeta })
} finally {
// 确保文件状态缓存被回写
setReadFileCache(engine.getReadFileState())
}
}
submitMessage 和 ask 都使用了 AsyncGenerator 而非 Promise。这个选择至关重要——它允许调用方在 LLM 响应的 过程中实时消费消息(流式输出),而不必等待整个响应完成。这对于长时间运行的工具调用尤其重要:用户可以看到实时进度,而不是面对一个无响应的界面。
5.3 查询循环:引擎的心跳
src/query.ts 中的 queryLoop 函数是整个查询引擎的核心——一个 while(true) 循环,每次迭代代表一个"轮次"(turn)。每个轮次包含:调用 LLM → 解析响应 → 执行工具 → 准备下一轮。
查询-响应循环序列图
循环状态管理
查询循环维护一个可变的 State 对象,在每次迭代之间传递:
// 可变的跨迭代状态
type State = {
messages: Message[]
toolUseContext: ToolUseContext
autoCompactTracking: AutoCompactTrackingState | undefined
maxOutputTokensRecoveryCount: number
hasAttemptedReactiveCompact: boolean
maxOutputTokensOverride: number | undefined
pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
stopHookActive: boolean | undefined
turnCount: number
// 上一次迭代为什么继续。用于测试断言恢复路径
transition: Continue | undefined
}
transition 字段记录了上一次迭代继续的原因,这对调试和测试非常有价值。可能的原因包括:
| transition.reason | 含义 |
|---|---|
next_turn | 正常的工具调用后继续 |
reactive_compact_retry | 响应式压缩后重试 |
collapse_drain_retry | 上下文折叠排空后重试 |
max_output_tokens_recovery | 输出 Token 超限恢复 |
max_output_tokens_escalate | 输出 Token 限制升级(8k → 64k) |
stop_hook_blocking | 停止钩子阻塞后重试 |
token_budget_continuation | Token 预算续写 |
每次迭代的完整流程
每次循环迭代按以下顺序执行: