智能体持久化
智能体持久化是 Koog 框架中为 AI 智能体提供检查点功能的一项特性。 它允许您在执行过程中的特定点保存和恢复智能体的状态,从而实现以下能力:
- 从特定点恢复智能体执行
- 回滚到先前状态
- 跨会话持久化智能体状态
核心概念
检查点
检查点捕获智能体在其执行过程中特定点的完整状态,包括:
- 消息历史记录(用户、系统、助手和工具之间的所有交互)
- 当前正在执行的节点
- 当前节点的输入数据
- 创建时间戳
检查点通过唯一 ID 进行标识,并与特定的智能体相关联。
安装
要使用智能体持久化功能,请将其添加到您的智能体配置中:
AIAgent<String, String> agent = AIAgent.<String, String>builder()
.promptExecutor(SimplePromptExecutorsKt.simpleOllamaAIExecutor("http://localhost:11434"))
.llmModel(OllamaModels.Meta.LLAMA_3_2)
.install(Persistence.Feature, cfg -> {
// 使用内存存储快照
cfg.setStorage(new InMemoryPersistenceStorageProvider());
})
.build();
配置选项
智能体持久化功能有三个主要配置选项:
- 存储提供程序:用于保存和检索检查点的提供程序。
- 持续持久化:在每个节点运行后自动创建检查点。
- 回滚策略:确定回滚到检查点时要恢复的状态。
存储提供程序
设置用于保存和检索检查点的存储提供程序:
AIAgent<String, String> agent = AIAgent.<String, String>builder()
.promptExecutor(SimplePromptExecutorsKt.simpleOllamaAIExecutor("http://localhost:11434"))
.llmModel(OllamaModels.Meta.LLAMA_3_2)
.install(Persistence.Feature, cfg -> {
cfg.setStorage(new InMemoryPersistenceStorageProvider());
})
.build();
框架包含以下内置提供程序:
InMemoryPersistenceStorageProvider:将检查点存储在内存中(应用程序重启后丢失)。FilePersistenceStorageProvider:将检查点持久化到文件系统。NoPersistenceStorageProvider:一个不存储检查点的空操作实现。这是默认提供程序。您也可以通过实现PersistenceStorageProvider接口来实现自定义存储提供程序。 更多信息,请参阅自定义存储提供程序。
持续持久化
持续持久化意味着每个节点运行后都会自动创建一个检查点。 要禁用持续持久化,请使用以下代码:
如果禁用了持续持久化,您仍然可以手动创建检查点。
基本用法
创建检查点
要了解如何在智能体执行的特定点创建检查点,请参阅以下代码示例:
<!--- INCLUDE import ai.koog.agents.core.agent.context.AIAgentContext import ai.koog.agents.snapshot.feature.persistence import ai.koog.serialization.typeToken
const val outputData = "some-output-data"
val outputType = typeTokensuspend fun example(context: AIAgentContext) {
// 使用当前状态创建检查点
val checkpoint = context.persistence().createCheckpointAfterNode(
agentContext = context,
nodePath = context.executionInfo.path(),
lastOutput = outputData,
lastOutputType = outputType,
checkpointId = context.runId,
version = 0L
)
// 检查点 ID 可以存储以备后用
val checkpointId = checkpoint?.checkpointId
}
从检查点恢复
要从特定检查点恢复智能体的状态,请按照以下代码示例操作:
回滚工具产生的所有副作用
某些工具产生副作用是很常见的。具体来说,当您在后台运行智能体时,一些工具很可能会执行某些数据库事务。这使得您的智能体更难回到过去。
假设您有一个工具 createUser,可以在数据库中创建新用户。并且您的智能体随着时间的推移已经调用了多个工具:
tool call: createUser "Alex"
->>>> checkpoint-1 <<<<-
tool call: createUser "Daniel"
tool call: createUser "Maria"
现在您希望回滚到某个检查点。仅恢复代理的状态(包括消息历史和策略图节点)不足以完全还原检查点之前的世界状态。您还需要恢复工具调用产生的副作用。在我们的示例中,这意味着从数据库中移除 Maria 和 Daniel。
通过 Koog 持久化功能,您可以通过为 Persistence 功能配置提供 RollbackToolRegistry 来实现这一点:
install(Persistence) {
enableAutomaticPersistence = true
rollbackToolRegistry = RollbackToolRegistry {
// 对于每个 `createUser` 工具调用,在回滚到目标执行点时,
// 都会按相反顺序调用一次 `removeUser`。
// 注意:`removeUser` 工具应接受与 `createUser` 完全相同的参数。
// 开发者需确保 `removeUser` 调用能回滚 `createUser` 的所有副作用:
registerRollback(::createUser, ::removeUser)
}
}
使用扩展函数
代理持久化功能提供了便捷的扩展函数来处理检查点:
suspend fun example(context: AIAgentContext) {
// 访问检查点功能
val checkpointFeature = context.persistence()
// 或使用检查点功能执行操作
context.withPersistence { ctx ->
// 'this' 指向检查点功能
createCheckpointAfterNode(
agentContext = ctx,
nodePath = ctx.executionInfo.path(),
lastOutput = outputData,
lastOutputType = outputType,
checkpointId = ctx.runId,
version = 0L
)
}
}
高级用法
自定义存储提供程序
您可以通过实现 PersistenceStorageProvider 接口来自定义存储提供程序:
class MyCustomStorageProvider<MyFilterType> : PersistenceStorageProvider<MyFilterType> {
override suspend fun getCheckpoints(sessionId: String, filter: MyFilterType?): List<AgentCheckpointData> {
TODO("Not yet implemented")
}
override suspend fun saveCheckpoint(sessionId: String, agentCheckpointData: AgentCheckpointData) {
TODO("Not yet implemented")
}
override suspend fun getLatestCheckpoint(sessionId: String, filter: MyFilterType?): AgentCheckpointData? {
TODO("Not yet implemented")
}
}
设置执行点
如需进行高级控制,您可以直接设置代理的执行点:
<!--- INCLUDE import ai.koog.agents.core.agent.context.AIAgentContext import ai.koog.agents.snapshot.feature.persistence import ai.koog.prompt.message.Message.User import ai.koog.serialization.JSONPrimitive
val customInput = JSONPrimitive("custom-input")
val customOutput = JSONPrimitive("custom-output")
val customMessageHistory = emptyListfun example(context: AIAgentContext) {
// 您可以在某个节点之前设置执行点,并为其提供输入:
context.persistence().setExecutionPoint(
agentContext = context,
nodePath = context.executionInfo.path(),
messageHistory = customMessageHistory,
input = customInput
)
// 或在某个节点之后设置执行点,并提供该节点的输出:
context.persistence().setExecutionPointAfterNode(
agentContext = context,
nodePath = context.executionInfo.path(),
messageHistory = customMessageHistory,
output = customOutput
)
}
这允许对代理状态进行更精细的控制,而不仅限于从检查点恢复。