协调器函数可以将其他协调器函数称为子协调程序。 子业务流程作为调用方(父)业务流程协调程序的子项运行,从调用方的角度来看,其行为类似于活动:它可以返回值、引发父协调器捕获的异常,并支持自动重试。
何时使用子业务流程
在以下情况下使用子业务流程:
-
撰写可重用的工作流构建块:将多步骤工作流提取到其自己的业务流程协调程序中,以便多个父业务流程可以调用它。
-
并行扇出业务流程:同时调度同一业务流程的多个实例,并等待所有实例完成。
-
组织复杂的工作流: 将大型业务流程分解为命名的可测试片段,而不是单个长函数。
注释
子业务编排必须在与父业务编排相同的应用中定义。 若要在不同的应用中调用编排流程,请改用 HTTP 202 轮询模式。 有关详细信息,请参阅 HTTP 功能。
本文内容:
定义子业务流程
以下示例演示了需要设置多个设备的 IoT 方案。 该函数表示为每个设备运行的安装工作流:
隔离工作者模型
public static async Task DeviceProvisioningOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context, string deviceId)
{
// Step 1: Create an installation package in blob storage and return a SAS URL.
Uri sasUrl = await context.CallActivityAsync<Uri>("CreateInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
await context.CallActivityAsync("SendPackageUrlToDevice", (deviceId, sasUrl));
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
await context.WaitForExternalEvent<bool>("DownloadCompletedAck");
// Step 4: ...
}
进程内模型
public static async Task DeviceProvisioningOrchestration(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string deviceId = context.GetInput<string>();
// Step 1: Create an installation package in blob storage and return a SAS URL.
Uri sasUrl = await context.CallActivityAsync<Uri>("CreateInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
await context.CallActivityAsync("SendPackageUrlToDevice", Tuple.Create(deviceId, sasUrl));
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
await context.WaitForExternalEvent<bool>("DownloadCompletedAck");
// Step 4: ...
}
const df = require("durable-functions");
df.app.orchestration("deviceProvisioningOrchestration", function* (context) {
const deviceId = context.df.getInput();
// Step 1: Create an installation package in blob storage and return a SAS URL.
const sasUrl = yield context.df.callActivity("createInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
yield context.df.callActivity("sendPackageUrlToDevice", { id: deviceId, url: sasUrl });
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
yield context.df.waitForExternalEvent("downloadCompletedAck");
// Step 4: ...
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
device_id = context.get_input()
# Step 1: Create an installation package in blob storage and return a SAS URL.
sas_url = yield context.call_activity("CreateInstallationPackage", device_id)
# Step 2: Notify the device that the installation package is ready.
yield context.call_activity("SendPackageUrlToDevice", { "id": device_id, "url": sas_url })
# Step 3: Wait for the device to acknowledge that it has downloaded the new package.
yield context.call_activity("DownloadCompletedAck")
# Step 4: ...
param($Context)
$deviceId = $Context.Input
# Step 1: Create an installation package in blob storage and return a SAS URL.
$sasUrl = Invoke-DurableActivity -FunctionName "CreateInstallationPackage" -Input $deviceId
# Step 2: Notify the device that the installation package is ready.
$deviceInfo = @{
id = $deviceId
url = $sasUrl
}
Invoke-DurableActivity -FunctionName "SendPackageUrlToDevice" -Input $deviceInfo
# Step 3: Wait for the device to acknowledge that it has downloaded the new package.
Start-DurableExternalEventListener -EventName "DownloadCompletedAck"
# Step 4: ...
@FunctionName("DeviceProvisioningOrchestration")
public void deviceProvisioningOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Step 1: Create an installation package in blob storage and return a SAS URL.
String deviceId = ctx.getInput(String.class);
String blobUri = ctx.callActivity("CreateInstallPackage", deviceId, String.class).await();
// Step 2: Notify the device that the installation package is ready.
String[] args = { deviceId, blobUri };
ctx.callActivity("SendPackageUrlToDevice", args).await();
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
ctx.waitForExternalEvent("DownloadCompletedAck").await();
// Step 4: ...
}
using Microsoft.DurableTask;
[DurableTask]
public class DeviceProvisioningOrchestration : TaskOrchestrator<string, object?>
{
public override async Task<object?> RunAsync(TaskOrchestrationContext context, string deviceId)
{
// Step 1: Create an installation package in blob storage and return a SAS URL.
Uri sasUrl = await context.CallActivityAsync<Uri>("CreateInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
await context.CallActivityAsync("SendPackageUrlToDevice", (deviceId, sasUrl.ToString()));
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
await context.WaitForExternalEvent<bool>("DownloadCompletedAck");
// Step 4: ...
return null;
}
}
from durabletask import task
def device_provisioning_orchestrator(ctx: task.OrchestrationContext, device_id: str):
# Step 1: Create an installation package in blob storage and return a SAS URL.
sas_url = yield ctx.call_activity("create_installation_package", input=device_id)
# Step 2: Notify the device that the installation package is ready.
yield ctx.call_activity("send_package_url_to_device", input={"id": device_id, "url": sas_url})
# Step 3: Wait for the device to acknowledge that it has downloaded the new package.
yield ctx.wait_for_external_event("DownloadCompletedAck")
# Step 4: ...
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationContext;
public class DeviceProvisioningOrchestration implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
String deviceId = ctx.getInput(String.class);
// Step 1: Create an installation package in blob storage and return a SAS URL.
String blobUri = ctx.callActivity("CreateInstallPackage", deviceId, String.class).await();
// Step 2: Notify the device that the installation package is ready.
String[] args = { deviceId, blobUri };
ctx.callActivity("SendPackageUrlToDevice", args).await();
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
ctx.waitForExternalEvent("DownloadCompletedAck").await();
// Step 4: ...
}
}
此业务流程协调程序函数可以单独运行以进行一次性设备设置,或者父业务流程协调程序可以使用调用子业务流程协调程序 API 将其安排为子业务流程。
并行运行子业务流程
以下示例显示了一个父业务流程协调程序,其并行扇出多个子业务流程协调程序。 某些语言使用确定性子实例 ID(派生自父实例 ID 和索引值),以防止在重播时出现重复的子业务流程。
隔离工作者模型
[Function("ProvisionNewDevices")]
public static async Task ProvisionNewDevices(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
string[] deviceIds = await context.CallActivityAsync<string[]>("GetNewDeviceIds");
// Run multiple device provisioning flows in parallel
var provisioningTasks = new List<Task>();
foreach (string deviceId in deviceIds)
{
Task provisionTask = context.CallSubOrchestratorAsync("DeviceProvisioningOrchestration", deviceId);
provisioningTasks.Add(provisionTask);
}
await Task.WhenAll(provisioningTasks);
// ...
}
进程内模型
[FunctionName("ProvisionNewDevices")]
public static async Task ProvisionNewDevices(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string[] deviceIds = await context.CallActivityAsync<string[]>("GetNewDeviceIds");
// Run multiple device provisioning flows in parallel
var provisioningTasks = new List<Task>();
foreach (string deviceId in deviceIds)
{
Task provisionTask = context.CallSubOrchestratorAsync("DeviceProvisioningOrchestration", deviceId);
provisioningTasks.Add(provisionTask);
}
await Task.WhenAll(provisioningTasks);
// ...
}
const df = require("durable-functions");
df.app.orchestration("provisionNewDevices", function* (context) {
const deviceIds = yield context.df.callActivity("getNewDeviceIds");
// Run multiple device provisioning flows in parallel
const provisioningTasks = [];
var id = 0;
for (const deviceId of deviceIds) {
const child_id = context.df.instanceId + `:${id}`;
const provisionTask = context.df.callSubOrchestrator(
"deviceProvisioningOrchestration",
deviceId,
child_id
);
provisioningTasks.push(provisionTask);
id++;
}
yield context.df.Task.all(provisioningTasks);
// ...
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
device_IDs = yield context.call_activity("GetNewDeviceIds")
# Run multiple device provisioning flows in parallel
provisioning_tasks = []
id_ = 0
for device_id in device_IDs:
child_id = f"{context.instance_id}:{id_}"
provision_task = context.call_sub_orchestrator("DeviceProvisioningOrchestration", device_id, child_id)
provisioning_tasks.append(provision_task)
id_ += 1
yield context.task_all(provisioning_tasks)
# ...
param($Context)
$deviceIds = Invoke-DurableActivity -FunctionName "GetNewDeviceIds"
# Run multiple device setting up flows in parallel
$provisioningTasks = @()
for ($i = 0; $i -lt $deviceIds.Count; $i++) {
$deviceId = $deviceIds[$i]
$childId = "$($Context.InstanceId):$i"
$provisionTask = Invoke-DurableSubOrchestrator `
-FunctionName "DeviceProvisioningOrchestration" `
-Input $deviceId `
-InstanceId $childId `
-NoWait
$provisioningTasks += $provisionTask
}
Wait-DurableTask -Task $provisioningTasks
# ...
@FunctionName("ProvisionNewDevices")
public void provisionNewDevices(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
List<?> deviceIDs = ctx.getInput(List.class);
// Schedule each device provisioning sub-orchestration to run in parallel
List<Task<Void>> parallelTasks = deviceIDs.stream()
.map(device -> ctx.callSubOrchestrator("DeviceProvisioningOrchestration", device))
.collect(Collectors.toList());
// ...
}
后续步骤
using Microsoft.DurableTask;
[DurableTask]
public class ProvisionNewDevices : TaskOrchestrator<object?, object?>
{
public override async Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
{
string[] deviceIds = await context.CallActivityAsync<string[]>("GetNewDeviceIds");
// Run multiple device provisioning flows in parallel
var provisioningTasks = new List<Task>();
foreach (string deviceId in deviceIds)
{
Task provisionTask = context.CallSubOrchestratorAsync("DeviceProvisioningOrchestration", deviceId);
provisioningTasks.Add(provisionTask);
}
await Task.WhenAll(provisioningTasks);
return null;
}
}
from durabletask import task
def provision_new_devices(ctx: task.OrchestrationContext, _):
device_ids = yield ctx.call_activity("get_new_device_ids")
# Run multiple device provisioning flows in parallel
provisioning_tasks = []
for device_id in device_ids:
provision_task = ctx.call_sub_orchestrator(device_provisioning_orchestrator, input=device_id)
provisioning_tasks.append(provision_task)
yield task.when_all(provisioning_tasks)
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationContext;
public class ProvisionNewDevices implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
List<?> deviceIDs = ctx.getInput(List.class);
// Schedule each device provisioning sub-orchestration to run in parallel
List<Task<Void>> parallelTasks = deviceIDs.stream()
.map(device -> ctx.callSubOrchestrator("DeviceProvisioningOrchestration", device))
.collect(Collectors.toList());
ctx.allOf(parallelTasks).await();
}
}
后续步骤