跳转至

流式传输 API

Koog 的 流式传输 API 允许你将 LLM 输出 作为 Flow<StreamFrame> 增量式消费。你的代码无需等待完整响应,即可:

  • 在助手文本到达时实时渲染,
  • 实时检测 工具调用 并采取行动,
  • 知晓流式传输 何时结束 及其原因。

流式传输携带 类型化帧,分为两类:

增量帧(增量/部分内容): - StreamFrame.TextDelta(text: String, index: Int?) — 增量助手文本 - StreamFrame.ReasoningDelta(text: String?, summary: String?, index: Int?) — 增量推理文本和摘要 - StreamFrame.ToolCallDelta(id: String?, name: String?, content: String?, index: Int?) — 部分工具调用

完整帧(完整内容): - StreamFrame.TextComplete(text: String) — 完整助手文本 - StreamFrame.ReasoningComplete(text: List<String>, summary: List<String>?) — 完整推理(含可选摘要) - StreamFrame.ToolCallComplete(id: String?, name: String, content: String) — 完整工具调用

结束标记: - StreamFrame.End(finishReason: String?) — 流式传输结束标记

提供辅助函数用于提取纯文本、将帧转换为 Message.Response 对象,并安全地 合并分块工具调用

API 概述

通过流式传输,你可以:

  • 在数据到达时即时处理(提升 UI 响应性)
  • 实时解析结构化信息(Markdown/JSON 等)
  • 在对象完成时立即发出
  • 实时触发工具调用
  • 实时访问模型推理(适用于支持的模型)

你可以直接操作 本身,也可以操作从帧派生的 纯文本

增量帧与完整帧

流式传输 API 区分两种类型的帧:

  • 增量帧 (DeltaFrame) — 以分块形式到达的增量/部分内容。这些帧非常适合内容流入时的实时显示。例如:TextDeltaReasoningDeltaToolCallDelta

  • 完整帧 (CompleteFrame) — 在接收完该内容类型的所有增量后发出的完整内容。这些帧适用于最终处理及转换为 Message.Response 对象。例如:TextCompleteReasoningCompleteToolCallComplete

通常,你会使用增量帧进行 UI 更新,使用完整帧提取最终的结构化数据。


使用方法

直接操作帧

这是最通用的方法:响应每种帧类型。

<!--- INCLUDE import ai.koog.agents.core.dsl.builder.strategy import ai.koog.agents.core.dsl.builder.node import ai.koog.prompt.streaming.StreamFrame

val strategy = strategy("strategy_name") { val node by node { -->

llm.writeSession {
    appendPrompt { user("Tell me a joke, then call a tool with JSON args.") }

    val stream = requestLLMStreaming() // Flow<StreamFrame>

    stream.collect { frame ->
        when (frame) {
            is StreamFrame.TextDelta -> print(frame.text)
            is StreamFrame.ReasoningDelta -> print("[Reasoning] text=${frame.text} summary=${frame.summary}")
            is StreamFrame.ToolCallComplete -> {
                println("\n🔧 Tool call: ${frame.name} args=${frame.content}")
                // 可选延迟解析:
                // val json = frame.contentJson
            }
            is StreamFrame.End -> println("\n[END] reason=${frame.finishReason}")
            else -> {} // 处理其他帧类型(TextComplete、ToolCallDelta 等)
        }
    }
}


需要注意的是,你可以通过直接处理原始字符串流来解析输出。

这种方法让你对解析过程拥有更高的灵活性和控制力。

以下是一个包含输出结构 Markdown 定义的原始字符串流:

fun markdownBookDefinition(): MarkdownStructureDefinition {
    return MarkdownStructureDefinition("name", schema = { /*...*/ })
}

val mdDefinition = markdownBookDefinition()

llm.writeSession {
    val stream = requestLLMStreaming(mdDefinition)
    // 直接访问原始字符串块
    stream.collect { chunk ->
        // 处理每个到达的文本块
        println("Received chunk: $chunk") // 这些块将共同构成遵循 mdDefinition 模式的文本
    }
}


处理推理帧

支持推理的模型(例如 Claude Sonnet 4.5 或 GPT-o1)在流式传输过程中会发出推理帧。你可以同时访问推理过程及其摘要:

llm.writeSession {
    appendPrompt { user("Solve this complex problem: ...") }

    val stream = requestLLMStreaming()
    val reasoningSteps = mutableListOf<String>()
    val summarySteps = mutableListOf<String>()

    stream.collect { frame ->
        when (frame) {
            is StreamFrame.ReasoningDelta -> {
                frame.text?.let { 
                    reasoningSteps.add(it)
                    print(frame.text) // Display reasoning as it arrives
                }
                frame.summary?.let {
                    summarySteps.add(it)
                    print(frame.summary) // Display reasoning summary as it arrives
                }
            }
            is StreamFrame.ReasoningComplete -> {
                // Access complete reasoning
                println("\nComplete reasoning: ${frame.text.joinToString("")}")
                println("Summary: ${frame.summary?.joinToString("") ?: "N/A"}")
            }
            is StreamFrame.TextDelta -> print(frame.text)
            is StreamFrame.End -> println("\n[END]")
            else -> {}
        }
    }
}

处理原始文本流(派生)

如果你已有的流式解析器期望 Flow<String>, 可以通过 filterTextOnly() 派生文本块,或使用 collectText() 收集它们。

llm.writeSession {
    val frames = requestLLMStreaming()

    // 流式传输到达的文本块:
    frames.filterTextOnly().collect { chunk -> print(chunk) }

    // 或者,在 End 事件后收集所有文本到一个字符串中:
    val fullText = frames.collectText()
    println("\n---\n$fullText")
}


在事件处理器中监听流事件

你可以在 代理事件处理器 中监听流事件。

<!--- INCLUDE import ai.koog.agents.core.dsl.builder.strategy import ai.koog.agents.core.dsl.builder.node import ai.koog.agents.core.agent.GraphAIAgent import ai.koog.agents.features.eventHandler.feature.handleEvents import ai.koog.prompt.streaming.StreamFrame import ai.koog.prompt.structure.markdown.MarkdownStructureDefinition

fun GraphAIAgent.FeatureContext.installStreamingApi() { -->

handleEvents {
    onToolCallStarting { context ->
        println("\n🔧 Using ${context.toolName} with ${context.toolArgs}... ")
    }
    onLLMStreamingFrameReceived { context ->
        when (val frame = context.streamFrame) {
            is StreamFrame.TextDelta -> print(frame.text)
            is StreamFrame.ReasoningDelta -> print("[Reasoning] text=${frame.text} summary=${frame.summary}")
            else -> {} // 根据需要处理其他帧类型
        }
    }
    onLLMStreamingFailed { context ->
        println("❌ Error: ${context.error}")
    }
    onLLMStreamingCompleted {
        println("🏁 Done")
    }
}


将帧转换为 Message.Response您可以将收集到的帧列表转换为标准消息对象:

  • toAssistantMessageOrNull() — 从文本帧中提取 Message.Assistant
  • toReasoningMessageOrNull() — 从推理帧中提取 Message.Reasoning
  • toToolCallMessages() — 从工具调用帧中提取 Message.Tool.Call
  • toMessageResponses() — 将所有完整帧转换为其对应的 Message.Response 对象

示例

流式处理中的结构化数据(Markdown 示例)

虽然可以处理原始字符串流,但通常更方便处理结构化数据

结构化数据方法包含以下关键组件:

  1. MarkdownStructureDefinition:一个帮助您定义 Markdown 格式结构化数据模式和示例的类。
  2. markdownStreamingParser:一个用于创建处理 Markdown 块流并发出事件的解析器的函数。

以下部分提供了处理结构化数据流的分步说明和代码示例。

1. 定义您的数据结构

首先,定义一个数据类来表示您的结构化数据:

@Serializable
data class Book(
    val title: String,
    val author: String,
    val description: String
)

// 一个简单的 Java POJO,相当于 Kotlin 的 @Serializable 数据类。
public class Book {
    public final String title;
    public final String author;
    public final String description;

    public Book(String title, String author, String description) {
        this.title = title;
        this.author = author;
        this.description = description;
    }
}

2. 定义 Markdown 结构

使用 MarkdownStructureDefinition 类创建一个定义,指定您的数据在 Markdown 中应如何结构化:

fun markdownBookDefinition(): MarkdownStructureDefinition {
    return MarkdownStructureDefinition("bookList", schema = {
        markdown {
            header(1, "title")
            bulleted {
                item("author")
                item("description")
            }
        }
    }, examples = {
        markdown {
            header(1, "The Great Gatsby")
            bulleted {
                item("F. Scott Fitzgerald")
                item("A novel set in the Jazz Age that tells the story of Jay Gatsby's unrequited love for Daisy Buchanan.")
            }
        }
    })
}


3. 为您的数据结构创建解析器

markdownStreamingParser 为不同的 Markdown 元素提供了多个处理程序:

=== "Kotlin"

markdownStreamingParser {
    // 处理一级标题(标题级别范围为1到6)
    onHeader(1) { headerText -> }
    // 处理项目符号列表项
    onBullet { bulletText -> }
    // 处理代码块
    onCodeBlock { codeBlockContent -> }
    // 处理匹配正则表达式的行
    onLineMatching(Regex("pattern")) { line -> }
    // 处理流结束事件
    onFinishStream { remainingText -> }
}


通过已定义的处理程序,您可以实现一个解析Markdown流并利用markdownStreamingParser函数输出数据对象的函数。

fun parseMarkdownStreamToBooks(markdownStream: Flow<StreamFrame>): Flow<Book> {
   return flow {
      markdownStreamingParser {
         var currentBookTitle = ""
         val bulletPoints = mutableListOf<String>()

         // 处理响应流中接收到Markdown标题的事件
         onHeader(1) { headerText ->
            // 若存在前一本图书,则输出
            if (currentBookTitle.isNotEmpty() && bulletPoints.isNotEmpty()) {
               val author = bulletPoints.getOrNull(0) ?: ""
               val description = bulletPoints.getOrNull(1) ?: ""
               emit(Book(currentBookTitle, author, description))
            }

            currentBookTitle = headerText
            bulletPoints.clear()
         }

         // 处理响应流中接收到Markdown项目符号列表的事件
         onBullet { bulletText ->
            bulletPoints.add(bulletText)
         }

         // 处理响应流结束事件
         onFinishStream {
            // 若存在最后一本图书,则输出
            if (currentBookTitle.isNotEmpty() && bulletPoints.isNotEmpty()) {
               val author = bulletPoints.getOrNull(0) ?: ""
               val description = bulletPoints.getOrNull(1) ?: ""
               emit(Book(currentBookTitle, author, description))
            }
         }
      }.parseStream(markdownStream.filterTextOnly())
   }
}


4. 在您的智能体策略中使用解析器

```kotlin val agentStrategy = strategy>("library-assistant") { // 描述包含输出流解析的节点 val getMdOutput by node> { booksDescription -> val books = mutableListOf() val mdDefinition = markdownBookDefinition()

`````kotlin llm.writeSession { appendPrompt { user(booksDescription) } // 以定义mdDefinition` 的形式发起响应流 val markdownStream = requestLLMStreaming(mdDefinition) // 使用响应流的结果调用解析器,并对结果执行操作 parseMarkdownStreamToBooks(markdownStream).collect { book -> books.add(book) println("解析的书籍: ${book.title} 作者: ${book.author}") } }

books

// 描述智能体的图结构,确保节点可访问
edge(nodeStart forwardTo getMdOutput)
edge(getMdOutput forwardTo nodeFinish)


高级用法:结合工具进行流式处理

您也可以将流式 API 与工具结合使用,以便在数据到达时即时处理。 以下部分简要介绍了如何定义工具并将其与流式数据结合使用的分步指南。

1. 为您的数据结构定义工具

@Serializable
data class Book(
    val title: String,
    val author: String,
    val description: String
)

class BookTool(): SimpleTool<Book>(
    argsType = typeToken<Book>(),
    name = NAME,
    description = "用于从 Markdown 解析书籍信息的工具"
) {

    companion object { const val NAME = "book" }

    override suspend fun execute(args: Book): String {
        println("${args.title} 作者 ${args.author}:\n ${args.description}")
        return "完成"
    }
}


2. 将工具与流式数据结合使用

val agentStrategy = strategy<String, Unit>("library-assistant") {
    val getMdOutput by node<String, Unit> { input ->
        val mdDefinition = markdownBookDefinition()

        llm.writeSession {
            appendPrompt { user(input) }
            val markdownStream = requestLLMStreaming(mdDefinition)

            parseMarkdownStreamToBooks(markdownStream).collect { book ->
                callToolRaw(BookTool.NAME, book)
                /* 其他可选方式:
                    callTool(BookTool::class, book)
                    callTool<BookTool>(book)
                    findTool(BookTool::class).execute(book)
                */
            }

            // 我们可以并行调用工具
            parseMarkdownStreamToBooks(markdownStream).toParallelToolCallsRaw(toolClass=BookTool::class).collect {
                println("工具调用结果: $it")
            }
        }
    }

    edge(nodeStart forwardTo getMdOutput)
    edge(getMdOutput forwardTo nodeFinish)
}


3. 在智能体配置中注册工具

val toolRegistry = ToolRegistry {
    tool(BookTool())
}

val runner = AIAgent(
    promptExecutor = simpleOpenAIExecutor("OPENAI_API_KEY"),
    llmModel = OpenAIModels.Chat.GPT4o,
    toolRegistry = toolRegistry
)

最佳实践1. 定义清晰的结构:为你的数据创建清晰且无歧义的 Markdown 结构。

  1. 提供优质示例:在 MarkdownStructureDefinition 中包含全面的示例,以指导 LLM

  2. 处理不完整数据:解析流中的数据时,始终检查空值或缺失值。

  3. 清理资源:使用 onFinishStream 处理器来清理资源并处理剩余数据。

  4. 处理错误:针对格式错误的 Markdown 或意外数据,实施适当的错误处理机制。

  5. 测试:使用多种输入场景测试你的解析器,包括部分数据块和格式错误的输入。

  6. 并行处理:对于独立的数据项,考虑使用并行工具调用以提升性能。