跳转至

A2A 服务器

A2A 服务器使您能够通过标准化的 A2A(Agent-to-Agent)协议公开 AI 智能体。它完整实现了 A2A 协议规范,处理客户端请求、执行智能体逻辑、管理复杂任务生命周期,并支持实时流式响应。

依赖项

要在您的项目中使用 A2A 服务器,请将以下依赖项添加到您的 build.gradle.kts 中:

dependencies {
    // Core A2A server library
    implementation("ai.koog:a2a-server:$koogVersion")

    // HTTP JSON-RPC transport (most common)
    implementation("ai.koog:a2a-transport-server-jsonrpc-http:$koogVersion")

    // Ktor server engine (choose one that fits your needs)
    implementation("io.ktor:ktor-server-netty:$ktorVersion")
}

概述

A2A 服务器充当 A2A 协议传输层与您的自定义智能体逻辑之间的桥梁。 它编排整个请求生命周期,同时保持协议合规性并提供稳健的会话管理。

核心组件

A2AServer

实现完整 A2A 协议的主服务器类。它作为中央协调器,负责:

  • 验证 传入请求是否符合协议规范
  • 管理 并发会话和任务生命周期
  • 编排 传输层、存储层和业务逻辑层之间的通信
  • 处理 所有协议操作:消息发送、任务查询、取消、推送通知

A2AServer 接受两个必需参数:

  • AgentExecutor:定义智能体的业务逻辑实现
  • AgentCard:定义智能体的能力和元数据

以及多个可选参数,可用于自定义其存储和传输行为。

AgentExecutor

AgentExecutor 接口是您实现智能体核心业务逻辑的地方。 它充当 A2A 协议与您特定 AI 智能体能力之间的桥梁。 要启动智能体的执行,您必须实现 execute 方法,在其中定义智能体的逻辑。 要取消智能体,您必须实现 cancel 方法。

class MyAgentExecutor : AgentExecutor {
    override suspend fun execute(
        context: RequestContext<MessageSendParams>,
        eventProcessor: SessionEventProcessor
    ) {
        // Agent logic here
    }

    override suspend fun cancel(
        context: RequestContext<TaskIdParams>,
        eventProcessor: SessionEventProcessor,
        agentJob: Deferred<Unit>?
    ) {
        // Cancel agent here, optional
    }
}

RequestContext 提供了关于当前请求的丰富信息, 包括当前会话的 contextIdtaskId、发送的 message 以及请求的 params

SessionEventProcessor 与客户端通信:

  • sendMessage(message):发送即时响应(聊天式交互)
  • sendTaskEvent(event):发送与任务相关的更新(长时间运行的操作)
// For immediate responses (like chatbots)
eventProcessor.sendMessage(
    Message(
        messageId = generateId(),
        role = Role.Agent,
        parts = listOf(TextPart("Here's your answer!")),
        contextId = context.contextId
    )
)

// For task-based operations
eventProcessor.sendTaskEvent(
    TaskStatusUpdateEvent(
        contextId = context.contextId,
        taskId = context.taskId,
        status = TaskStatus(
            state = TaskState.Working,
            message = Message(/* progress update */),
            timestamp = Clock.System.now()
        ),
        final = false  // More updates to come
    )
)

AgentCard

AgentCard 充当您智能体的自描述清单。它告知客户端您的智能体能做什么、如何与之通信以及有哪些安全要求。

val agentCard = AgentCard(
    // Basic Identity
    name = "Advanced Recipe Assistant",
    description = "AI agent specialized in cooking advice, recipe generation, and meal planning",
    version = "2.1.0",
    protocolVersion = "0.3.0",

    // Communication Settings
    url = "https://api.example.com/a2a",
    preferredTransport = TransportProtocol.JSONRPC,

    // Optional: Multiple transport support
    additionalInterfaces = listOf(
        AgentInterface("https://api.example.com/a2a", TransportProtocol.JSONRPC),
    ),

    // Capabilities Declaration
    capabilities = AgentCapabilities(
        streaming = true,              // Support real-time responses
        pushNotifications = true,      // Send async notifications
        stateTransitionHistory = true  // Maintain task history
    ),

    // Content Type Support
    defaultInputModes = listOf("text/plain", "text/markdown", "image/jpeg"),
    defaultOutputModes = listOf("text/plain", "text/markdown", "application/json"),

    // Define available security schemes
    securitySchemes = mapOf(
        "bearer" to HTTPAuthSecurityScheme(
            scheme = "Bearer",
            bearerFormat = "JWT",
            description = "JWT token authentication"
        ),
        "api-key" to APIKeySecurityScheme(
            `in` = In.Header,
            name = "X-API-Key",
            description = "API key for service authentication"
        )
    ),

    // Specify security requirements (logical OR of requirements)
    security = listOf(
        mapOf("bearer" to listOf("read", "write")),  // Option 1: JWT with read/write scopes
        mapOf("api-key" to emptyList())              // Option 2: API key
    ),

    // Enable extended card for authenticated users
    supportsAuthenticatedExtendedCard = true,

    // Skills/Capabilities
    skills = listOf(
        AgentSkill(
            id = "recipe-generation",
            name = "Recipe Generation",
            description = "Generate custom recipes based on ingredients, dietary restrictions, and preferences",
            tags = listOf("cooking", "recipes", "nutrition"),
            examples = listOf(
                "Create a vegan pasta recipe with mushrooms",
                "I have chicken, rice, and vegetables. What can I make?"
            )
        ),
        AgentSkill(
            id = "meal-planning",
            name = "Meal Planning",
            description = "Plan weekly meals and generate shopping lists",
            tags = listOf("meal-planning", "nutrition", "shopping")
        )
    ),

    // Optional: Branding
    iconUrl = "https://example.com/agent-icon.png",
    documentationUrl = "https://docs.example.com/recipe-agent",
    provider = AgentProvider(
        organization = "CookingAI Inc.",
        url = "https://cookingai.com"
    )
)

传输层

A2A 本身支持多种传输协议与客户端通信。 目前,Koog 提供了基于 HTTPJSON-RPC 服务器传输实现。

HTTP JSON-RPC 传输

val transport = HttpJSONRPCServerTransport(server)
transport.start(
    engineFactory = CIO,           // Ktor engine (CIO, Netty, Jetty)
    port = 8080,                   // Server port
    path = "/a2a",                 // API endpoint path
    wait = true                    // Block until server stops
)

存储

A2A 服务器采用可插拔的存储架构,分离不同类型的数据。 所有存储实现都是可选的,默认情况下使用内存中的变体进行开发。- TaskStorage:任务生命周期管理 - 存储并管理任务状态、历史记录与产物 - MessageStorage:对话历史管理 - 在会话上下文中管理消息历史 - PushNotificationConfigStorage:Webhook 管理 - 管理用于异步通知的 Webhook 配置

快速开始

1. 创建 AgentCard

定义智能体的能力与元数据。

val agentCard = AgentCard(
    name = "IO Assistant",
    description = "AI agent specialized in input modification",
    version = "2.1.0",
    protocolVersion = "0.3.0",

    // Communication Settings
    url = "https://api.example.com/a2a",
    preferredTransport = TransportProtocol.JSONRPC,

    // Capabilities Declaration
    capabilities =
        AgentCapabilities(
            streaming = true,              // Support real-time responses
            pushNotifications = true,      // Send async notifications
            stateTransitionHistory = true  // Maintain task history
        ),

    // Content Type Support
    defaultInputModes = listOf("text/plain", "text/markdown", "image/jpeg"),
    defaultOutputModes = listOf("text/plain", "text/markdown", "application/json"),

    // Skills/Capabilities
    skills = listOf(
        AgentSkill(
            id = "echo",
            name = "echo",
            description = "Echoes back user messages",
            tags = listOf("io"),
        )
    )
)

2. 创建 AgentExecutor

执行器负责实现智能体逻辑,处理传入请求并发送响应。

class EchoAgentExecutor : AgentExecutor {
    override suspend fun execute(
        context: RequestContext<MessageSendParams>,
        eventProcessor: SessionEventProcessor
    ) {
        val userMessage = context.params.message
        val userText = userMessage.parts
            .filterIsInstance<TextPart>()
            .joinToString(" ") { it.text }

        // Echo the user's message back
        val response = Message(
            messageId = UUID.randomUUID().toString(),
            role = Role.Agent,
            parts = listOf(TextPart("You said: $userText")),
            contextId = context.contextId,
            taskId = context.taskId
        )

        eventProcessor.sendMessage(response)
    }
}

2. 创建服务器

将智能体执行器与智能体卡片传入服务器。

val server = A2AServer(
    agentExecutor = EchoAgentExecutor(),
    agentCard = agentCard
)

3. 添加传输层

创建传输层并启动服务器。

// HTTP JSON-RPC transport
val transport = HttpJSONRPCServerTransport(server)
transport.start(
    engineFactory = CIO,
    port = 8080,
    path = "/agent",
    wait = true
)

智能体实现模式

简单响应型智能体

若智能体仅需对单条消息进行响应,可将其实现为简单智能体。 该模式同样适用于执行逻辑不复杂且耗时较短的场景。

class SimpleAgentExecutor : AgentExecutor {
    override suspend fun execute(
        context: RequestContext<MessageSendParams>,
        eventProcessor: SessionEventProcessor
    ) {
        val response = Message(
            messageId = UUID.randomUUID().toString(),
            role = Role.Agent,
            parts = listOf(TextPart("Hello from agent!")),
            contextId = context.contextId,
            taskId = context.taskId
        )

        eventProcessor.sendMessage(response)
    }
}

任务型智能体

若智能体的执行逻辑复杂且需要多步骤处理,可将其实现为任务型智能体。 该模式同样适用于执行逻辑耗时较长且支持暂停的场景。

class TaskAgentExecutor : AgentExecutor {
    override suspend fun execute(
        context: RequestContext<MessageSendParams>,
        eventProcessor: SessionEventProcessor
    ) {
        // Send working status
        eventProcessor.sendTaskEvent(
            TaskStatusUpdateEvent(
                contextId = context.contextId,
                taskId = context.taskId,
                status = TaskStatus(
                    state = TaskState.Working,
                    timestamp = Clock.System.now()
                ),
                final = false
            )
        )

        // Do work...

        // Send completion
        eventProcessor.sendTaskEvent(
            TaskStatusUpdateEvent(
                contextId = context.contextId,
                taskId = context.taskId,
                status = TaskStatus(
                    state = TaskState.Completed,
                    timestamp = Clock.System.now()
                ),
                final = true
            )
        )
    }
}