Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
In sequential orchestration, agents are organized in a pipeline. Each agent processes the task in turn, passing its output to the next agent in the sequence. This is ideal for workflows where each step builds upon the previous one, such as document review, data processing pipelines, or multi-stage reasoning.
Important
By default, each agent in the sequence consumes the previous agent's full conversation — both the input messages provided to the previous agent and its response messages. You can configure agents to consume only the previous agent's response messages instead. See Controlling Context Between Agents for details.
What You'll Learn
- How to create a sequential pipeline of agents
- How to chain agents where each builds upon the previous output
- How to add human-in-the-loop approval for sensitive tool calls
- How to mix agents with custom executors for specialized tasks
- How to track the conversation flow through the pipeline
Define Your Agents
In sequential orchestration, agents are organized in a pipeline where each agent processes the task in turn, passing output to the next agent in the sequence.
Set Up the Azure OpenAI Client
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
.GetProjectOpenAIClient()
.GetProjectResponsesClient()
.AsIChatClient(deploymentName);
Warning
DefaultAzureCredential is convenient for development but requires careful consideration in production. In production, consider using a specific credential (e.g., ManagedIdentityCredential) to avoid latency issues, unintended credential probing, and potential security risks from fallback mechanisms.
Create specialized agents that will work in sequence:
// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient,
$"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
$"input by outputting the name of the input language and then translating the input to {targetLanguage}.");
// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
select GetTranslationAgent(lang, client));
Set Up the Sequential Orchestration
Build the workflow using AgentWorkflowBuilder:
// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);
Run the Sequential Workflow
Execute the workflow and process the events:
// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentResponseUpdateEvent e)
{
if (e.ExecutorId != lastExecutorId)
{
lastExecutorId = e.ExecutorId;
Console.WriteLine();
Console.Write($"{e.ExecutorId}: ");
}
Console.Write(e.Update.Text);
}
else if (evt is WorkflowOutputEvent outputEvt)
{
result = outputEvt.As<List<ChatMessage>>()!;
break;
}
}
// Display final result
Console.WriteLine();
foreach (var message in result)
{
Console.WriteLine($"{message.Role}: {message.Text}");
}
Sample Output
French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!
Sequential Orchestration with Human-in-the-Loop
Sequential orchestrations support human-in-the-loop interactions through tool approval. When agents use tools wrapped with ApprovalRequiredAIFunction, the workflow pauses and emits a RequestInfoEvent containing a ToolApprovalRequestContent. External systems (such as a human operator) can inspect the tool call, approve or reject it, and the workflow resumes accordingly.
Tip
For more details on the request and response model, see Human-in-the-Loop.
Define Agents with Approval-Required Tools
Create agents where sensitive tools are wrapped with ApprovalRequiredAIFunction:
ChatClientAgent deployAgent = new(
client,
"You are a DevOps engineer. Check staging status first, then deploy to production.",
"DeployAgent",
"Handles deployments",
[
AIFunctionFactory.Create(CheckStagingStatus),
new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
]);
ChatClientAgent verifyAgent = new(
client,
"You are a QA engineer. Verify that the deployment was successful and summarize the results.",
"VerifyAgent",
"Verifies deployments");
Build and Run with Approval Handling
Build the sequential workflow normally. The approval flow is handled through the event stream:
var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is RequestInfoEvent e &&
e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
{
await run.SendResponseAsync(
e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
}
}
Note
AgentWorkflowBuilder.BuildSequential() supports tool approval out of the box — no additional configuration is needed. When an agent calls a tool wrapped with ApprovalRequiredAIFunction, the workflow automatically pauses and emits a RequestInfoEvent.
Tip
For a complete runnable example of this approval flow, see the GroupChatToolApproval sample. The same RequestInfoEvent handling pattern applies to other orchestrations.
Key Concepts
- Sequential Processing: Each agent processes the output of the previous agent in order
- AgentWorkflowBuilder.BuildSequential(): Creates a pipeline workflow from a collection of agents
- ChatClientAgent: Represents an agent backed by a chat client with specific instructions
- InProcessExecution.RunStreamingAsync(): Runs the workflow and returns a
StreamingRunfor real-time event streaming - Event Handling: Monitor agent progress through
AgentResponseUpdateEventand completion throughWorkflowOutputEvent - Tool Approval: Wrap sensitive tools with
ApprovalRequiredAIFunctionto require human approval before execution - RequestInfoEvent: Emitted when a tool requires approval; contains
ToolApprovalRequestContentwith the tool call details
In sequential orchestration, each agent processes the task in turn, with output flowing from one to the next. Start by defining agents for a two-stage process:
import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
writer = chat_client.as_agent(
instructions=(
"You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
),
name="writer",
)
reviewer = chat_client.as_agent(
instructions=(
"You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
),
name="reviewer",
)
Set Up the Sequential Orchestration
The SequentialBuilder class creates a pipeline where agents process tasks in order. Each agent sees the full conversation history and adds their response:
from agent_framework.orchestrations import SequentialBuilder
# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()
Run the Sequential Workflow
Execute the workflow and collect the final output. The terminal output is an AgentResponse containing the last agent's response messages:
from agent_framework import AgentResponse
# 3) Run and print the last agent's response
events = await workflow.run("Write a tagline for a budget-friendly eBike.")
outputs = events.get_outputs()
if outputs:
print("===== Final Response =====")
final: AgentResponse = outputs[0]
for msg in final.messages:
name = msg.author_name or "assistant"
print(f"[{name}]\n{msg.text}")
Sample Output
===== Final Response =====
[reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!
Advanced: Mixing Agents with Custom Executors
Sequential orchestration supports mixing agents with custom executors for specialized processing. This is useful when you need custom logic that doesn't require an LLM:
Define a Custom Executor
Note
When a custom executor follows an agent in the sequence, its handler receives an AgentExecutorResponse (because agents are internally wrapped by AgentExecutor). Use agent_response.full_conversation to access the full conversation history. A custom executor used as the last participant (terminator) must call ctx.yield_output(AgentResponse(...)) so its output becomes the workflow's terminal output.
from agent_framework import AgentExecutorResponse, AgentResponse, Executor, WorkflowContext, handler
from agent_framework import Message
from typing_extensions import Never
class Summarizer(Executor):
"""Terminator custom executor: consumes full conversation and yields a summary as the workflow's final answer."""
@handler
async def summarize(
self,
agent_response: AgentExecutorResponse,
ctx: WorkflowContext[Never, AgentResponse]
) -> None:
if not agent_response.full_conversation:
await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
return
users = sum(1 for m in agent_response.full_conversation if m.role == "user")
assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
await ctx.yield_output(AgentResponse(messages=[summary]))
Build a Mixed Sequential Workflow
# Create a content agent
content = chat_client.as_agent(
instructions="Produce a concise paragraph answering the user's request.",
name="content",
)
# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()
Sample Output with Custom Executor
===== Final Summary =====
Summary -> users:1 assistants:1
Controlling Context Between Agents
By default, each agent in a SequentialBuilder workflow consumes the previous agent's full conversation (input + response messages). Setting chain_only_agent_responses=True configures all agents in the sequence to consume only the previous agent's response messages instead:
workflow = SequentialBuilder(
participants=[writer, translator, reviewer],
chain_only_agent_responses=True,
).build()
This is useful for translation pipelines, progressive refinement, and other scenarios where each agent should focus solely on transforming the prior agent's output without being influenced by earlier conversation turns.
For a complete example, see sequential_chain_only_agent_responses.py in the Agent Framework repository.
Tip
For more fine-grained control over context flow — including custom filter functions — see Context Modes in the Agent Executor reference.
Intermediate Outputs
By default, only the last participant's output surfaces as a workflow output event. Set intermediate_outputs=True to surface every participant's output, in addition to the final output:
workflow = SequentialBuilder(
participants=[writer, reviewer, editor],
intermediate_outputs=True,
).build()
You can handle these events in real-time in streaming mode:
from agent_framework import AgentResponseUpdate
# Track the last author to format streaming output.
last_author: str | None = None
async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
update = event.data
author = update.author_name
if author != last_author:
if last_author is not None:
print() # Newline between different authors
print(f"{author}: {update.text}", end="", flush=True)
last_author = author
else:
print(update.text, end="", flush=True)
Sequential Orchestration with Human-in-the-Loop
Sequential orchestrations support human-in-the-loop interactions in two ways: tool approval for controlling sensitive tool calls, and request info for pausing after each agent response to gather feedback.
Tip
For more details on the request and response model, see Human-in-the-Loop.
Tool Approval in Sequential Workflows
Use @tool(approval_mode="always_require") to mark tools that need human approval before execution. The workflow pauses and emits a request_info event when the agent tries to call the tool.
@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
return f"Query executed successfully: {query}"
database_agent = Agent(
client=chat_client,
name="DatabaseAgent",
instructions="You are a database assistant.",
tools=[execute_database_query],
)
workflow = SequentialBuilder(participants=[database_agent]).build()
Process the event stream and handle approval requests:
async def process_event_stream(stream):
responses = {}
async for event in stream:
if event.type == "request_info" and event.data.type == "function_approval_request":
responses[event.request_id] = event.data.to_function_approval_response(approved=True)
return responses if responses else None
stream = workflow.run("Check the schema and update all pending orders", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Tip
For a complete runnable example, see sequential_builder_tool_approval.py. Tool approval works with SequentialBuilder without any extra builder configuration.
Request Info for Agent Feedback
Use .with_request_info() to pause after specific agents respond, allowing external input (such as human review) before the next agent begins:
drafter = Agent(
client=chat_client,
name="drafter",
instructions="You are a document drafter. Create a brief draft on the given topic.",
)
editor = Agent(
client=chat_client,
name="editor",
instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)
finalizer = Agent(
client=chat_client,
name="finalizer",
instructions="You are a finalizer. Create a polished final version.",
)
# Enable request info for the editor agent only
workflow = (
SequentialBuilder(participants=[drafter, editor, finalizer])
.with_request_info(agents=["editor"])
.build()
)
async def process_event_stream(stream):
responses = {}
async for event in stream:
if event.type == "request_info":
responses[event.request_id] = AgentRequestInfoResponse.approve()
return responses if responses else None
stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Tip
See the full samples: sequential tool approval and sequential request info.
Key Concepts
- Shared Context: By default, each agent consumes the previous agent's full conversation, including input and response messages
- Context Control: Use
chain_only_agent_responses=Trueto configure agents to consume only the previous agent's response messages - AgentResponse Output: The workflow's terminal output is an
AgentResponsecontaining the last agent's response (not the full conversation) - Order Matters: Agents execute strictly in the order specified in the
participantslist - Flexible Participants: You can mix agents and custom executors in any order
- Custom Terminator Contract: A custom executor used as the last participant must call
ctx.yield_output(AgentResponse(...))to produce the terminal output - Intermediate Outputs: Set
intermediate_outputs=Trueto surface every participant's output as a workflowoutputevent, not just the last participant's - Tool Approval: Use
@tool(approval_mode="always_require")for sensitive operations that need human review - Request Info: Use
.with_request_info(agents=[...])to pause after specific agents for external feedback