跳转至

智能体持久化

智能体持久化是 Koog 框架中为 AI 智能体提供检查点功能的一项特性。 它允许您在执行过程中的特定点保存和恢复智能体的状态,从而实现以下能力:

  • 从特定点恢复智能体执行
  • 回滚到先前状态
  • 跨会话持久化智能体状态

核心概念

检查点

检查点捕获智能体在其执行过程中特定点的完整状态,包括:

  • 消息历史记录(用户、系统、助手和工具之间的所有交互)
  • 当前正在执行的节点
  • 当前节点的输入数据
  • 创建时间戳

检查点通过唯一 ID 进行标识,并与特定的智能体相关联。

安装

要使用智能体持久化功能,请将其添加到您的智能体配置中:

val agent = AIAgent(
    promptExecutor = executor,
    llmModel = OllamaModels.Meta.LLAMA_3_2,
) {
    install(Persistence) {
        // 使用内存存储快照
        storage = InMemoryPersistenceStorageProvider()
    }
}

AIAgent<String, String> agent = AIAgent.<String, String>builder()
    .promptExecutor(SimplePromptExecutorsKt.simpleOllamaAIExecutor("http://localhost:11434"))
    .llmModel(OllamaModels.Meta.LLAMA_3_2)
    .install(Persistence.Feature, cfg -> {
        // 使用内存存储快照
        cfg.setStorage(new InMemoryPersistenceStorageProvider());
    })
.build();

配置选项

智能体持久化功能有三个主要配置选项:

  • 存储提供程序:用于保存和检索检查点的提供程序。
  • 持续持久化:在每个节点运行后自动创建检查点。
  • 回滚策略:确定回滚到检查点时要恢复的状态。

存储提供程序

设置用于保存和检索检查点的存储提供程序:

install(Persistence) {
    storage = InMemoryPersistenceStorageProvider()
}

AIAgent<String, String> agent = AIAgent.<String, String>builder()
    .promptExecutor(SimplePromptExecutorsKt.simpleOllamaAIExecutor("http://localhost:11434"))
    .llmModel(OllamaModels.Meta.LLAMA_3_2)
    .install(Persistence.Feature, cfg -> {
        cfg.setStorage(new InMemoryPersistenceStorageProvider());
    })
    .build();

框架包含以下内置提供程序:

  • InMemoryPersistenceStorageProvider:将检查点存储在内存中(应用程序重启后丢失)。
  • FilePersistenceStorageProvider:将检查点持久化到文件系统。
  • NoPersistenceStorageProvider:一个不存储检查点的空操作实现。这是默认提供程序。您也可以通过实现 PersistenceStorageProvider 接口来实现自定义存储提供程序。 更多信息,请参阅自定义存储提供程序

持续持久化

持续持久化意味着每个节点运行后都会自动创建一个检查点。 要禁用持续持久化,请使用以下代码:

install(Persistence) {
    enableAutomaticPersistence = false
}

AIAgent<String, String> agent = AIAgent.<String, String>builder()
    .promptExecutor(SimplePromptExecutorsKt.simpleOllamaAIExecutor("http://localhost:11434"))
    .llmModel(OllamaModels.Meta.LLAMA_3_2)
    .install(Persistence.Feature, cfg -> {
        cfg.setEnableAutomaticPersistence(true);
    })
    .build();

如果禁用了持续持久化,您仍然可以手动创建检查点。

基本用法

创建检查点

要了解如何在智能体执行的特定点创建检查点,请参阅以下代码示例:

<!--- INCLUDE import ai.koog.agents.core.agent.context.AIAgentContext import ai.koog.agents.snapshot.feature.persistence import ai.koog.serialization.typeToken

const val outputData = "some-output-data" val outputType = typeToken() -->

suspend fun example(context: AIAgentContext) {
    // 使用当前状态创建检查点
    val checkpoint = context.persistence().createCheckpointAfterNode(
        agentContext = context,
        nodePath = context.executionInfo.path(),
        lastOutput = outputData,
        lastOutputType = outputType,
        checkpointId = context.runId,
        version = 0L
    )

    // 检查点 ID 可以存储以备后用
    val checkpointId = checkpoint?.checkpointId
}


从检查点恢复

要从特定检查点恢复智能体的状态,请按照以下代码示例操作:

suspend fun example(context: AIAgentContext, checkpointId: String) {
    // 回滚到特定检查点
    context.persistence().rollbackToCheckpoint(checkpointId, context)

    // 或回滚到最新检查点
    context.persistence().rollbackToLatestCheckpoint(context)
}


回滚工具产生的所有副作用

某些工具产生副作用是很常见的。具体来说,当您在后台运行智能体时,一些工具很可能会执行某些数据库事务。这使得您的智能体更难回到过去。

假设您有一个工具 createUser,可以在数据库中创建新用户。并且您的智能体随着时间的推移已经调用了多个工具:

tool call: createUser "Alex"

->>>> checkpoint-1 <<<<-

tool call: createUser "Daniel"
tool call: createUser "Maria"

现在您希望回滚到某个检查点。仅恢复代理的状态(包括消息历史和策略图节点)不足以完全还原检查点之前的世界状态。您还需要恢复工具调用产生的副作用。在我们的示例中,这意味着从数据库中移除 MariaDaniel

通过 Koog 持久化功能,您可以通过为 Persistence 功能配置提供 RollbackToolRegistry 来实现这一点:

install(Persistence) {
    enableAutomaticPersistence = true
    rollbackToolRegistry = RollbackToolRegistry {
        // 对于每个 `createUser` 工具调用,在回滚到目标执行点时,
        // 都会按相反顺序调用一次 `removeUser`。
        // 注意:`removeUser` 工具应接受与 `createUser` 完全相同的参数。
        // 开发者需确保 `removeUser` 调用能回滚 `createUser` 的所有副作用:
        registerRollback(::createUser, ::removeUser)
    }
}


使用扩展函数

代理持久化功能提供了便捷的扩展函数来处理检查点:

suspend fun example(context: AIAgentContext) {
    // 访问检查点功能
    val checkpointFeature = context.persistence()

    // 或使用检查点功能执行操作
    context.withPersistence { ctx ->
        // 'this' 指向检查点功能
        createCheckpointAfterNode(
            agentContext = ctx,
            nodePath = ctx.executionInfo.path(),
            lastOutput = outputData,
            lastOutputType = outputType,
            checkpointId = ctx.runId,
            version = 0L
        )
    }
}


高级用法

自定义存储提供程序

您可以通过实现 PersistenceStorageProvider 接口来自定义存储提供程序:

class MyCustomStorageProvider<MyFilterType> : PersistenceStorageProvider<MyFilterType> {
    override suspend fun getCheckpoints(sessionId: String, filter: MyFilterType?): List<AgentCheckpointData> {
        TODO("Not yet implemented")
    }

    override suspend fun saveCheckpoint(sessionId: String, agentCheckpointData: AgentCheckpointData) {
        TODO("Not yet implemented")
    }

    override suspend fun getLatestCheckpoint(sessionId: String, filter: MyFilterType?): AgentCheckpointData? {
        TODO("Not yet implemented")
    }
}


要在功能配置中使用您的自定义提供程序,请在代理中配置代理持久化功能时将其设置为存储。

install(Persistence) {
    storage = MyCustomStorageProvider<Any>()
}


设置执行点

如需进行高级控制,您可以直接设置代理的执行点:

<!--- INCLUDE import ai.koog.agents.core.agent.context.AIAgentContext import ai.koog.agents.snapshot.feature.persistence import ai.koog.prompt.message.Message.User import ai.koog.serialization.JSONPrimitive

val customInput = JSONPrimitive("custom-input") val customOutput = JSONPrimitive("custom-output") val customMessageHistory = emptyList() -->

fun example(context: AIAgentContext) {
    // 您可以在某个节点之前设置执行点,并为其提供输入:
    context.persistence().setExecutionPoint(
        agentContext = context,
        nodePath = context.executionInfo.path(),
        messageHistory = customMessageHistory,
        input = customInput
    )

    // 或在某个节点之后设置执行点,并提供该节点的输出:
    context.persistence().setExecutionPointAfterNode(
        agentContext = context,
        nodePath = context.executionInfo.path(),
        messageHistory = customMessageHistory,
        output = customOutput
    )
}


这允许对代理状态进行更精细的控制,而不仅限于从检查点恢复。