Microsoft Agent Framework 工作流 - 使用工作流作为代理

本文档概述了如何在 Microsoft Agent Framework 中使用 工作流作为代理

概述

有时,你已使用多个代理、自定义执行器和复杂逻辑构建了复杂的工作流,但想要像使用任何其他代理一样使用它。 这正是工作流代理让你能够执行的内容。 通过将工作流包装为一个 Agent,可以通过用于简单聊天代理的相同熟悉 API 与工作流进行交互。

主要优势

  • 统一接口:使用与简单代理相同的 API 与复杂工作流交互
  • API 兼容性:将工作流与支持代理接口的现有系统集成
  • 可组合性:使用工作流代理作为大型代理系统或其他工作流中的构建基块
  • 会话管理:利用代理会话进行会话状态和恢复
  • 流式处理支持:在工作流执行时获取实时更新

工作原理

将工作流转换为代理时:

  1. 验证工作流以确保其启动执行程序可以接受所需的输入类型
  2. 创建一个会话以管理对话状态
  3. 输入消息将路由到工作流的启动执行程序
  4. 工作流事件转换为代理响应更新
  5. 外部输入请求(来自 RequestInfoExecutor)显示为函数调用

要求

若要将工作流用作代理,工作流的启动执行程序必须能够作为输入进行处理 IEnumerable<ChatMessage> 。 使用创建的 AsAIAgent基于代理的执行程序时,系统会自动满足此条件。

创建工作流代理

使用 AsAIAgent() 扩展方法将任何兼容的工作流转换为代理:

using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

// Create agents
AIAgent researchAgent = chatClient.AsAIAgent("You are a researcher. Research and gather information on the given topic.");
AIAgent writerAgent = chatClient.AsAIAgent("You are a writer. Write clear, engaging content based on research.");
AIAgent reviewerAgent = chatClient.AsAIAgent("You are a reviewer. Review the content and provide a final polished version.");

// Build a sequential workflow
var workflow = new WorkflowBuilder(researchAgent)
    .AddEdge(researchAgent, writerAgent)
    .AddEdge(writerAgent, reviewerAgent)
    .Build();

// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAIAgent(
    id: "content-pipeline",
    name: "Content Pipeline Agent",
    description: "A multi-agent workflow that researches, writes, and reviews content"
);

AsAIAgent 参数

参数 类型 Description
id string? 代理的可选唯一标识符。 如果未提供,则自动生成。
name string? 代理的可选显示名称。
description string? 可选的代理用途说明。
executionEnvironment IWorkflowExecutionEnvironment? 可选的执行环境。 默认为 InProcessExecution.OffThreadInProcessExecution.Concurrent 基于工作流配置。
includeExceptionDetails bool 如果 true,则包括错误内容中的异常消息。 默认为 false
includeWorkflowOutputsInResponse bool 如果 true,将传出工作流输出转换为代理响应中的内容。 默认为 false

使用工作流代理

创建会话

与工作流代理的每个对话都需要会话来管理状态:

// Create a new session for the conversation
AgentSession session = await workflowAgent.CreateSessionAsync();

非流式执行

对于需要完整响应的简单用例:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

AgentResponse response = await workflowAgent.RunAsync(messages, session);

foreach (ChatMessage message in response.Messages)
{
    Console.WriteLine($"{message.AuthorName}: {message.Text}");
}

流式执行

对于工作流执行的实时更新:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, session))
{
    // Process streaming updates from each agent in the workflow
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.Write(update.Text);
    }
}

处理外部输入请求

当工作流包含请求外部输入(使用 RequestInfoExecutor)的执行程序时,这些请求在代理响应中显示为函数调用:

await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, session))
{
    // Check for function call requests
    foreach (AIContent content in update.Contents)
    {
        if (content is FunctionCallContent functionCall)
        {
            // Handle the external input request
            Console.WriteLine($"Workflow requests input: {functionCall.Name}");
            Console.WriteLine($"Request data: {functionCall.Arguments}");

            // Provide the response in the next message
        }
    }
}

会话序列化和恢复

工作流代理会话可以序列化为持久性并稍后恢复:

// Serialize the session state
JsonElement serializedSession = await workflowAgent.SerializeSessionAsync(session);

// Store serializedSession to your persistence layer...

// Later, resume the session
AgentSession resumedSession = await workflowAgent.DeserializeSessionAsync(serializedSession);

// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedSession))
{
    Console.Write(update.Text);
}

要求

若要将工作流用作代理,工作流的启动执行程序必须能够处理消息输入。 使用 Agent 或基于代理的执行程序时,系统会自动满足此条件。

创建工作流代理

调用 as_agent() 任何兼容的工作流,将其转换为代理:

from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential

# Create your chat client and agents
client = FoundryChatClient(
    project_endpoint="<your-endpoint>",
    model="<your-deployment>",
    credential=AzureCliCredential(),
)

researcher = client.as_agent(
    name="Researcher",
    instructions="Research and gather information on the given topic.",
)

writer = client.as_agent(
    name="Writer",
    instructions="Write clear, engaging content based on research.",
)

# Build a sequential workflow
workflow = SequentialBuilder(participants=[researcher, writer]).build()

# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")

as_agent参数

参数 类型 Description
name str | None 代理的可选显示名称。 如果未提供,则自动生成。

使用工作流代理

创建会话

可以选择创建会话来管理跨多个轮次的对话状态:

# Create a new session for the conversation
session = await workflow_agent.create_session()

注释

会话是可选的。 如果没有传递 sessionrun(),代理程序会在内部处理状态。 如果 workflow.as_agent() 在没有 context_providers 的情况下创建,框架会默认添加一个 InMemoryHistoryProvider(),以便多回合记录能够直接使用。 如果您显式传递 context_providers,该列表将按原样使用。

非流式执行

对于需要完整响应的简单用例:

# You can pass a plain string as input
response = await workflow_agent.run("Write an article about AI trends")

for message in response.messages:
    print(f"{message.author_name}: {message.text}")

流式执行

对于工作流执行的实时更新:

async for update in workflow_agent.run(
    "Write an article about AI trends",
    stream=True,
):
    if update.text:
        print(update.text, end="", flush=True)

处理外部输入请求

当工作流包含请求外部输入(使用 request_info)的执行程序时,这些请求在代理响应中显示为函数调用。 函数调用使用名称 WorkflowAgent.REQUEST_INFO_FUNCTION_NAME

from agent_framework import Content, Message, WorkflowAgent

response = await workflow_agent.run("Process my request")

# Look for function calls in the response
human_review_function_call = None
for message in response.messages:
    for content in message.contents:
        if content.name == WorkflowAgent.REQUEST_INFO_FUNCTION_NAME:
            human_review_function_call = content

对挂起请求提供响应

若要在外部输入请求后继续执行工作流,请创建函数结果并将其发送回:

if human_review_function_call:
    # Parse the request arguments
    request = WorkflowAgent.RequestInfoFunctionArgs.from_json(
        human_review_function_call.arguments
    )

    # Create a response (your custom response type)
    result_data = MyResponseType(approved=True, feedback="Looks good")

    # Create the function call result
    function_result = Content.from_function_result(
        call_id=human_review_function_call.call_id,
        result=result_data,
    )

    # Send the response back to continue the workflow
    response = await workflow_agent.run(Message("tool", [function_result]))

完整的示例

下面是演示如何使用流式处理输出的工作流代理的完整示例:

import asyncio
import os

from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential


async def main():
    # Set up the chat client
    client = FoundryChatClient(
        project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
        model=os.environ["FOUNDRY_MODEL"],
        credential=AzureCliCredential(),
    )

    # Create specialized agents
    researcher = client.as_agent(
        name="Researcher",
        instructions="Research the given topic and provide key facts.",
    )

    writer = client.as_agent(
        name="Writer",
        instructions="Write engaging content based on the research provided.",
    )

    reviewer = client.as_agent(
        name="Reviewer",
        instructions="Review the content and provide a final polished version.",
    )

    # Build a sequential workflow
    workflow = SequentialBuilder(participants=[researcher, writer, reviewer]).build()

    # Convert to a workflow agent
    workflow_agent = workflow.as_agent(name="Content Creation Pipeline")

    # Run the workflow
    print("Starting workflow...")
    print("=" * 60)

    current_author = None
    async for update in workflow_agent.run(
        "Write about quantum computing",
        stream=True,
    ):
        # Show when different agents are responding
        if update.author_name and update.author_name != current_author:
            if current_author:
                print("\n" + "-" * 40)
            print(f"\n[{update.author_name}]:")
            current_author = update.author_name

        if update.text:
            print(update.text, end="", flush=True)

    print("\n" + "=" * 60)
    print("Workflow completed!")


if __name__ == "__main__":
    asyncio.run(main())

了解事件转换

当工作流作为代理运行时,工作流事件将转换为代理响应。 响应的类型取决于调用 run()方式:

  • run():返回一个 AgentResponse,其中包含工作流完成后的完整结果。
  • run(..., stream=True):在工作流执行时返回对象的异步迭代 AgentResponseUpdate ,提供实时更新

在执行期间,内部工作流事件将映射到代理响应,如下所示:

工作流事件 代理响应
event.type == "output" 作为 AgentResponseUpdate (流式处理)传递或聚合到 AgentResponse (非流式处理)
event.type == "request_info" 使用 WorkflowAgent.REQUEST_INFO_FUNCTION_NAME 转换为函数调用内容
其他事件 忽略 (仅限工作流内部)

此转换允许使用标准代理接口,同时仍有权在需要时访问详细的工作流信息。

用例

1. 复杂的代理管道

将多代理工作流包装为单个代理,以便在应用程序中使用:

User Request --> [Workflow Agent] --> Final Response
                      |
                      +-- Researcher Agent
                      +-- Writer Agent  
                      +-- Reviewer Agent

2. 代理组合

将工作流代理用作较大系统中的组件:

  • 工作流代理可由另一个代理用作工具
  • 多个工作流代理可以协调在一起
  • 工作流代理可以嵌套在其他工作流中

3. API 集成

通过支持标准代理接口的 API 暴露复杂的工作流,以便:

  • 使用复杂后端工作流的聊天接口
  • 与现有基于代理的系统集成
  • 从简单代理逐步迁移到复杂的工作流

后续步骤