本文档概述了如何在 Microsoft Agent Framework 中使用 工作流作为代理 。
概述
有时,你已使用多个代理、自定义执行器和复杂逻辑构建了复杂的工作流,但想要像使用任何其他代理一样使用它。 这正是工作流代理让你能够执行的内容。 通过将工作流包装为一个 Agent,可以通过用于简单聊天代理的相同熟悉 API 与工作流进行交互。
主要优势
- 统一接口:使用与简单代理相同的 API 与复杂工作流交互
- API 兼容性:将工作流与支持代理接口的现有系统集成
- 可组合性:使用工作流代理作为大型代理系统或其他工作流中的构建基块
- 会话管理:利用代理会话进行会话状态和恢复
- 流式处理支持:在工作流执行时获取实时更新
工作原理
将工作流转换为代理时:
- 验证工作流以确保其启动执行程序可以接受所需的输入类型
- 创建一个会话以管理对话状态
- 输入消息将路由到工作流的启动执行程序
- 工作流事件转换为代理响应更新
- 外部输入请求(来自
RequestInfoExecutor)显示为函数调用
要求
若要将工作流用作代理,工作流的启动执行程序必须能够作为输入进行处理 IEnumerable<ChatMessage> 。 使用创建的 AsAIAgent基于代理的执行程序时,系统会自动满足此条件。
创建工作流代理
使用 AsAIAgent() 扩展方法将任何兼容的工作流转换为代理:
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
// Create agents
AIAgent researchAgent = chatClient.AsAIAgent("You are a researcher. Research and gather information on the given topic.");
AIAgent writerAgent = chatClient.AsAIAgent("You are a writer. Write clear, engaging content based on research.");
AIAgent reviewerAgent = chatClient.AsAIAgent("You are a reviewer. Review the content and provide a final polished version.");
// Build a sequential workflow
var workflow = new WorkflowBuilder(researchAgent)
.AddEdge(researchAgent, writerAgent)
.AddEdge(writerAgent, reviewerAgent)
.Build();
// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAIAgent(
id: "content-pipeline",
name: "Content Pipeline Agent",
description: "A multi-agent workflow that researches, writes, and reviews content"
);
AsAIAgent 参数
| 参数 | 类型 | Description |
|---|---|---|
id |
string? |
代理的可选唯一标识符。 如果未提供,则自动生成。 |
name |
string? |
代理的可选显示名称。 |
description |
string? |
可选的代理用途说明。 |
executionEnvironment |
IWorkflowExecutionEnvironment? |
可选的执行环境。 默认为 InProcessExecution.OffThread 或 InProcessExecution.Concurrent 基于工作流配置。 |
includeExceptionDetails |
bool |
如果 true,则包括错误内容中的异常消息。 默认为 false。 |
includeWorkflowOutputsInResponse |
bool |
如果 true,将传出工作流输出转换为代理响应中的内容。 默认为 false。 |
使用工作流代理
创建会话
与工作流代理的每个对话都需要会话来管理状态:
// Create a new session for the conversation
AgentSession session = await workflowAgent.CreateSessionAsync();
非流式执行
对于需要完整响应的简单用例:
var messages = new List<ChatMessage>
{
new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};
AgentResponse response = await workflowAgent.RunAsync(messages, session);
foreach (ChatMessage message in response.Messages)
{
Console.WriteLine($"{message.AuthorName}: {message.Text}");
}
流式执行
对于工作流执行的实时更新:
var messages = new List<ChatMessage>
{
new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};
await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, session))
{
// Process streaming updates from each agent in the workflow
if (!string.IsNullOrEmpty(update.Text))
{
Console.Write(update.Text);
}
}
处理外部输入请求
当工作流包含请求外部输入(使用 RequestInfoExecutor)的执行程序时,这些请求在代理响应中显示为函数调用:
await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, session))
{
// Check for function call requests
foreach (AIContent content in update.Contents)
{
if (content is FunctionCallContent functionCall)
{
// Handle the external input request
Console.WriteLine($"Workflow requests input: {functionCall.Name}");
Console.WriteLine($"Request data: {functionCall.Arguments}");
// Provide the response in the next message
}
}
}
会话序列化和恢复
工作流代理会话可以序列化为持久性并稍后恢复:
// Serialize the session state
JsonElement serializedSession = await workflowAgent.SerializeSessionAsync(session);
// Store serializedSession to your persistence layer...
// Later, resume the session
AgentSession resumedSession = await workflowAgent.DeserializeSessionAsync(serializedSession);
// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedSession))
{
Console.Write(update.Text);
}
要求
若要将工作流用作代理,工作流的启动执行程序必须能够处理消息输入。 使用 Agent 或基于代理的执行程序时,系统会自动满足此条件。
创建工作流代理
调用 as_agent() 任何兼容的工作流,将其转换为代理:
from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential
# Create your chat client and agents
client = FoundryChatClient(
project_endpoint="<your-endpoint>",
model="<your-deployment>",
credential=AzureCliCredential(),
)
researcher = client.as_agent(
name="Researcher",
instructions="Research and gather information on the given topic.",
)
writer = client.as_agent(
name="Writer",
instructions="Write clear, engaging content based on research.",
)
# Build a sequential workflow
workflow = SequentialBuilder(participants=[researcher, writer]).build()
# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")
as_agent参数
| 参数 | 类型 | Description |
|---|---|---|
name |
str | None |
代理的可选显示名称。 如果未提供,则自动生成。 |
使用工作流代理
创建会话
可以选择创建会话来管理跨多个轮次的对话状态:
# Create a new session for the conversation
session = await workflow_agent.create_session()
注释
会话是可选的。 如果没有传递 session 给 run(),代理程序会在内部处理状态。
如果 workflow.as_agent() 在没有 context_providers 的情况下创建,框架会默认添加一个 InMemoryHistoryProvider(),以便多回合记录能够直接使用。
如果您显式传递 context_providers,该列表将按原样使用。
非流式执行
对于需要完整响应的简单用例:
# You can pass a plain string as input
response = await workflow_agent.run("Write an article about AI trends")
for message in response.messages:
print(f"{message.author_name}: {message.text}")
流式执行
对于工作流执行的实时更新:
async for update in workflow_agent.run(
"Write an article about AI trends",
stream=True,
):
if update.text:
print(update.text, end="", flush=True)
处理外部输入请求
当工作流包含请求外部输入(使用 request_info)的执行程序时,这些请求在代理响应中显示为函数调用。 函数调用使用名称 WorkflowAgent.REQUEST_INFO_FUNCTION_NAME:
from agent_framework import Content, Message, WorkflowAgent
response = await workflow_agent.run("Process my request")
# Look for function calls in the response
human_review_function_call = None
for message in response.messages:
for content in message.contents:
if content.name == WorkflowAgent.REQUEST_INFO_FUNCTION_NAME:
human_review_function_call = content
对挂起请求提供响应
若要在外部输入请求后继续执行工作流,请创建函数结果并将其发送回:
if human_review_function_call:
# Parse the request arguments
request = WorkflowAgent.RequestInfoFunctionArgs.from_json(
human_review_function_call.arguments
)
# Create a response (your custom response type)
result_data = MyResponseType(approved=True, feedback="Looks good")
# Create the function call result
function_result = Content.from_function_result(
call_id=human_review_function_call.call_id,
result=result_data,
)
# Send the response back to continue the workflow
response = await workflow_agent.run(Message("tool", [function_result]))
完整的示例
下面是演示如何使用流式处理输出的工作流代理的完整示例:
import asyncio
import os
from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential
async def main():
# Set up the chat client
client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
# Create specialized agents
researcher = client.as_agent(
name="Researcher",
instructions="Research the given topic and provide key facts.",
)
writer = client.as_agent(
name="Writer",
instructions="Write engaging content based on the research provided.",
)
reviewer = client.as_agent(
name="Reviewer",
instructions="Review the content and provide a final polished version.",
)
# Build a sequential workflow
workflow = SequentialBuilder(participants=[researcher, writer, reviewer]).build()
# Convert to a workflow agent
workflow_agent = workflow.as_agent(name="Content Creation Pipeline")
# Run the workflow
print("Starting workflow...")
print("=" * 60)
current_author = None
async for update in workflow_agent.run(
"Write about quantum computing",
stream=True,
):
# Show when different agents are responding
if update.author_name and update.author_name != current_author:
if current_author:
print("\n" + "-" * 40)
print(f"\n[{update.author_name}]:")
current_author = update.author_name
if update.text:
print(update.text, end="", flush=True)
print("\n" + "=" * 60)
print("Workflow completed!")
if __name__ == "__main__":
asyncio.run(main())
了解事件转换
当工作流作为代理运行时,工作流事件将转换为代理响应。 响应的类型取决于调用 run()方式:
-
run():返回一个AgentResponse,其中包含工作流完成后的完整结果。 -
run(..., stream=True):在工作流执行时返回对象的异步迭代AgentResponseUpdate,提供实时更新
在执行期间,内部工作流事件将映射到代理响应,如下所示:
| 工作流事件 | 代理响应 |
|---|---|
event.type == "output" |
作为 AgentResponseUpdate (流式处理)传递或聚合到 AgentResponse (非流式处理) |
event.type == "request_info" |
使用 WorkflowAgent.REQUEST_INFO_FUNCTION_NAME 转换为函数调用内容 |
| 其他事件 | 忽略 (仅限工作流内部) |
此转换允许使用标准代理接口,同时仍有权在需要时访问详细的工作流信息。
用例
1. 复杂的代理管道
将多代理工作流包装为单个代理,以便在应用程序中使用:
User Request --> [Workflow Agent] --> Final Response
|
+-- Researcher Agent
+-- Writer Agent
+-- Reviewer Agent
2. 代理组合
将工作流代理用作较大系统中的组件:
- 工作流代理可由另一个代理用作工具
- 多个工作流代理可以协调在一起
- 工作流代理可以嵌套在其他工作流中
3. API 集成
通过支持标准代理接口的 API 暴露复杂的工作流,以便:
- 使用复杂后端工作流的聊天接口
- 与现有基于代理的系统集成
- 从简单代理逐步迁移到复杂的工作流