Warning
功能工作流 API 是 实验 性的,将来版本可能会更改或删除,但不会通知。
通过函数工作流 API,可以将工作流编写为纯Python异步函数。 而不是定义执行器类、连接边并使用 WorkflowBuilder,可以对 async 函数进行 @workflow 装饰,并使用原生的Python控制流(if/else、for 循环、asyncio.gather)来表达您的逻辑。
有关与图形 API 的并行比较,请参阅工作流概述上的 工作流 API 。
@workflow 修饰器
将 @workflow 应用于函数 async,以便将其转换为 FunctionalWorkflow 对象:
from agent_framework import workflow
@workflow
async def text_pipeline(text: str) -> str:
upper = await to_upper_case(text)
return await reverse_text(upper)
@workflow修饰器支持具有可选参数的参数化形式:
from agent_framework import InMemoryCheckpointStorage, workflow
storage = InMemoryCheckpointStorage()
@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
...
@workflow 参数
| 参数 | 类型 | 说明 |
|---|---|---|
name |
str | None |
工作流的显示名称。 默认为函数的 __name__. |
description |
str | None |
可选的人工可读说明。 |
checkpoint_storage |
CheckpointStorage | None |
用于在运行之间保留步骤结果的默认存储。 可以在 run() 中重写每个调用。 |
工作流函数签名
工作流函数 的第一个参数 接收传递给 .run()的输入。 请仅在需要 HITL、键/值状态或自定义事件时才添加ctx: WorkflowRunContext参数,否则为可选。
# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
result = await process(data)
return result
# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
WorkflowRunContext 通过类型注解首先检测,然后通过参数名称 ctx 检测,所以 ctx: WorkflowRunContext 和裸 ctx 参数都起作用。
运行工作流
调用.run()于FunctionalWorkflow返回的@workflow对象。
# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world") # str — the raw return value
# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text) # first output as a string
print(result.get_outputs()) # list of all outputs
print(result.get_final_state()) # WorkflowRunState.IDLE
run() 参数
| 参数 | 类型 | 说明 |
|---|---|---|
message |
Any | None |
输入作为第一个参数传递给工作流函数。 |
stream |
bool |
如果 True,则返回一个 ResponseStream,该对象生成 WorkflowEvent 对象。 默认为 False。 |
responses |
dict[str, Any] | None |
HITL 响应由 request_id 键控。 用于恢复挂起的工作流。 |
checkpoint_id |
str | None |
要从中还原的检查点。 需要设置 checkpoint_storage。 |
checkpoint_storage |
CheckpointStorage | None |
覆盖当前运行中修饰器上设置的默认存储。 |
include_status_events |
bool |
在非流式处理结果中包含状态更改事件。 |
每次调用必须提供 message、responses 或 checkpoint_id 中的其中一个。
WorkflowRunResult
run() (非流式处理)返回一个 WorkflowRunResult。 关键方法:
| 方法/属性 | Returns | 说明 |
|---|---|---|
.text |
str |
首先输出为字符串 如果没有字符串输出,则为空字符串。 |
.get_outputs() |
list[Any] |
工作流发出的所有输出。 |
.get_final_state() |
WorkflowRunState |
最终运行状态 (IDLE, , IDLE_WITH_PENDING_REQUESTSFAILED...)。 |
.get_request_info_events() |
list[WorkflowEvent] |
状态为 IDLE_WITH_PENDING_REQUESTS 时待处理的 HITL 请求。 |
流媒体
将 stream=True 传入,以便在事件生成时即时接收事件。
from agent_framework import workflow
@workflow
async def data_pipeline(url: str) -> str:
raw = await fetch_data(url)
return await transform_data(raw)
# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
有关完整示例,请参阅 python/samples/03-workflows/functional/basic_streaming_pipeline.py 。
@step 修饰器
@step 是一种选择性加入的修饰器,可将结果缓存、事件触发和每步骤检查点添加到单个异步函数中。
from agent_framework import step, workflow
@step
async def fetch_data(url: str) -> dict:
# expensive — hits a real API
return await http_get(url)
@workflow
async def pipeline(url: str) -> str:
raw = await fetch_data(url)
return process(raw)
在工作流中@step的作用是什么
-
缓存结果 - 结果由
(step_name, call_index). 在 HITL 还原或检查点恢复时,已完成的步骤会立即返回其保存的结果,而不是重新执行。 -
触发事件 -
executor_invoked/executor_completed/executor_failed被触发以增强可观测性。 在缓存命中时,将发出executor_bypassed。 -
保存检查点 - 如果工作流具有
checkpoint_storage,则在每个步骤完成后保存检查点。 -
注入
WorkflowRunContext— 如果步骤函数声明参数ctx: WorkflowRunContext,则会自动注入活动上下文。
在工作流运行之外,@step 是透明的——该函数的行为与其未经装饰的版本相同,因此可以在隔离条件下对其进行完全测试。
何时使用 @step
对执行 费用高昂 的函数 使用 @step: 代理调用、外部 API 请求或任何在恢复时重新执行会产生高昂代价或有副作用的操作。 纯函数(无 @step)仍在内部 @workflow工作;它们只是在工作流恢复时重新执行。
from agent_framework import InMemoryCheckpointStorage, step, workflow
storage = InMemoryCheckpointStorage()
@step # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
return (await agent.run(prompt)).text
# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
return len(text) > 0
@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
draft = await call_llm(f"Write about: {topic}")
ok = await validate(draft)
return draft if ok else ""
@step 还接受参数 name :
@step(name="transform")
async def transform_data(raw: dict) -> str:
...
有关完整示例,请参阅 python/samples/03-workflows/functional/steps_and_checkpointing.py 。
WorkflowRunContext
WorkflowRunContext (短别名: RunContext是注入到工作流和步骤函数中的执行上下文。 仅当使用 HITL、键/值状态或自定义事件时,才需要它。
从 agent_framework中导入它:
from agent_framework import WorkflowRunContext, workflow
ctx.request_info() — 人机协同
ctx.request_info() 挂起工作流以等待外部输入:
@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
draft = await write_draft(topic)
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
return await revise_draft(draft, feedback)
参数:
| 参数 | 类型 | 说明 |
|---|---|---|
request_data |
Any |
描述所需输入的数据载荷(字典, Pydantic 模型, 字符串, ……)。 |
response_type |
type |
响应的预期Python类型。 |
request_id |
str | None |
此请求的稳定标识符。 如果省略,则会生成随机 UUID。 |
重播语义: 首次执行时,request_info() 引发一个暂停工作流的内部信号(在你的代码中是不可见的)。 呼叫者接收 WorkflowRunResult 和 get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. 通过调用 .run(responses={request_id: value}) 恢复 — 工作流从顶部重新执行,并 request_info() 立即返回提供的值。
@step-被装饰的函数在挂起前运行,在恢复时返回其缓存的结果,而非重新执行。
处理响应:
# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(requests[0].request_id) # "review_request"
# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)
有关完整示例,请参阅 python/samples/03-workflows/functional/hitl_review.py 。
ctx.request_info() 也在 @step 函数内部受支持。
ctx.add_event() — 自定义事件
使用ctx.add_event()来发出特定于应用程序的事件以及框架生命周期事件。 有关完整详细信息和示例,请参阅 发出自定义事件。
ctx.get_state()
/
ctx.set_state() — 键/值状态
使用 ctx.get_state() 和 ctx.set_state() 存储在 HITL 中断中保留的值,并将其包含在检查点中。 有关完整详细信息,请参阅 工作流状态。
配置检查点存储时,状态值必须是 JSON 可序列化的。
ctx.is_streaming()
返回 True 当前运行是由 stream=True 启动时。 在步骤函数内部,在需要根据流式处理模式调整其行为时非常有用。
get_run_context()
从正在运行的工作流内的任意位置检索活动 WorkflowRunContext:在不声明 ctx 参数的辅助函数中很有用。
from agent_framework import get_run_context
async def helper():
ctx = get_run_context()
if ctx is not None:
ctx.set_state("helper_ran", True)
在工作流未运行时调用 None 返回。
利用 asyncio.gather 实现并行
对扇出/扇入使用标准Python并发性 — 无需框架基元:
import asyncio
from agent_framework import workflow
@workflow
async def research_pipeline(topic: str) -> str:
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
asyncio.gather 在函数被 @step修饰时也能正常工作。
有关完整示例,请参阅 python/samples/03-workflows/functional/parallel_pipeline.py 。
在工作流中调用代理
代理调用在@workflow内部以纯函数调用的形式工作。
from agent_framework import Agent, workflow
writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)
@workflow
async def poem_workflow(topic: str) -> str:
poem = (await writer.run(f"Write a poem about: {topic}")).text
review = (await reviewer.run(f"Review this poem: {poem}")).text
return f"Poem:\n{poem}\n\nReview: {review}"
要在 HITL 恢复或检查点还原中缓存结果,请将 @step 添加到代理调用函数中:
from agent_framework import step
@step
async def write_poem(topic: str) -> str:
return (await writer.run(f"Write a poem about: {topic}")).text
有关完整示例,请参阅 python/samples/03-workflows/functional/agent_integration.py 。
.as_agent() — 使用工作流作为代理
将 FunctionalWorkflow 包装为与 .as_agent() 兼容的代理对象:
from agent_framework import workflow
@workflow
async def poem_workflow(topic: str) -> str:
...
# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")
# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)
# Or use in a larger workflow or orchestration
.as_agent() 返回一个 FunctionalWorkflowAgent,该对象公开与其他代理对象相同的 run() 接口,使功能工作流能够与任何接受代理的系统进行组合。
| 参数 | 类型 | 说明 |
|---|---|---|
name |
str | None |
代理人的显示名称。 默认为工作流名称。 |
有关示例,请参阅 python/samples/03-workflows/functional/agent_integration.py 。
Samples
可运行的示例位于以下示例文件夹中:
-
python/samples/01-get-started/— 介绍性@workflow示例 -
python/samples/03-workflows/functional/— 功能齐全的工作流示例
后续步骤
相关主题:
目前,函数工作流 API 不适用于 C# 。