你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
目前,Azure 流分析 (ASA) 仅支持将行添加(追加)到 SQL 输出(Azure SQL 数据库 和 Azure Synapse Analytics)。 本文讨论通过使用Azure Functions作为中间层,在 SQL 数据库上启用 UPDATE、UPSERT 或 MERGE的解决方法。
Azure Functions的替代选项将在末尾介绍。
要求
可以使用以下模式之一将数据写入表:
| 模式 | 等效的 T-SQL 语句 | 要求 |
|---|---|---|
| 附加 | INSERT | 无 |
| 替换 | MERGE (UPSERT) | 唯一键 |
| 累积 | MERGE (UPSERT),带有复合赋值运算符 (+=, -=...) |
唯一密钥和累加器 |
为了展示差异,请考虑在摄入以下两条记录时会发生什么情况:
| 到达_时间 | Device_Id | Measure_Value |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
在 追加 模式下,插入两条记录。 等效的 T-SQL 语句为:
INSERT INTO [target] VALUES (...);
结果:
| Modified_Time | Device_Id | Measure_Value |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
在 替换 模式下,您只能通过键获取最后一个值。 此处使用 Device_Id作为密钥。 等效的 T-SQL 语句为:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
结果:
| Modified_Time | 设备密钥 | Measure_Value |
|---|---|---|
| 10:05 | A | 20 |
最后,在累积模式下,使用复合赋值运算符(Value) 求和+=。 此处还将Device_Id用作密钥:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
结果:
| Modified_Time | 设备密钥 | Measure_Value |
|---|---|---|
| 10:05 | A | 21 |
出于性能考虑,ASA SQL 数据库输出适配器目前仅本身支持追加模式。 这些适配器使用批量插入来最大化吞吐量并限制反压力。
本文演示如何使用 Azure Functions 来实现 ASA 的替换和累加模式。 将函数用作中间层时,潜在的写入性能不会影响流式处理作业。 因此,使用 Azure Functions 与 Azure SQL 搭配使用效果最佳。 使用 Synapse SQL,从批量语句切换到逐行语句可能会产生更大的性能问题。
Azure Functions输出
在此作业中,你将 ASA SQL 输出替换为 ASA Azure Functions 输出。 该函数实现 UPDATE、UPSERT 或 MERGE 功能。
目前,可以使用两个选项访问函数中的 SQL 数据库。 第一个选项是 Azure SQL 输出绑定。 它目前仅限于 C#,它仅提供替换模式。 第二个选项是编写 SQL 查询,通过相应的 SQL 驱动程序(适用于 .NET 的 Microsoft.Data.SqlClient)提交。
以下示例中的两个都假定下表架构。 绑定选项要求在目标表上设置一个主键。 在使用 SQL 驱动程序时,不强制要求执行此操作,但建议执行。
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
若要将函数用作 ASA 的输出,该函数必须满足以下预期:
- Azure 流分析 期望从 Functions 应用中获取 HTTP 状态 200,以便成功处理其批处理任务。
- 当Azure 流分析从Azure函数收到 413(“http 请求实体太大”)异常时,它会减小发送到 Azure 函数的批处理的大小。
- 在测试连接期间,流分析会发送一个包含空批处理的 POST 请求到 Azure Functions,并期望接收到 HTTP 状态 20x 以验证测试。
选项 1:使用 Azure Functions SQL 绑定按键更新
此选项使用 Azure Functions SQL 输出绑定。 此扩展可以替换表中的对象,而无需编写 SQL 语句。 目前不支持复合赋值运算符(累加)。
此示例基于:
- Azure Functions 运行时版本 4
- .NET 6.0
- Microsoft。Azure。WebJobs.Extensions.Sql 0.1.131-preview
若要更好地了解绑定方法,请遵循 本教程。
首先按照本教程的说明,创建一个默认 HttpTrigger 函数应用。 使用以下信息:
- 语言:
C# - 运行时:
.NET 6(function/runtime v4 中) - 模板:
HTTP trigger
通过在位于项目文件夹的终端中运行以下命令来安装绑定扩展:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
在您的local.settings.json的Values部分中添加SqlConnectionString项,并填写目标服务器的连接字符串。
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
将整个函数(项目中的 .cs 文件)替换为以下代码片段。 使用您自己的命名空间、类名称和函数名称进行更新:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
更新绑定部分中的目标表名称:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
更新 Device 类和映射部分,以匹配自己的架构:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
现在,你可以通过调试(在 Visual Studio Code 中为 F5)来测试本地函数和数据库之间的连接。 需要能够从你的计算机上访问 SQL 数据库。 可以使用 SSMS 检查连接。 然后,将 POST 请求发送到本地终结点。 具有空正文的请求应返回 HTTP 204。 具有实际有效负载的请求应该在目标表中持久化(在替换/更新模式下)。 下面是一个与此示例中使用的模式相对应的示例有效负载:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
现在可以将函数发布到 Azure。 为 . 设置SqlConnectionString。 Azure SQL Server 防火墙应该允许 Azure 服务通过,这样实时函数才能到达。
然后,可以将函数定义为 ASA 作业中的输出,并使用它替换记录,而不是插入记录。
选项 2:通过自定义 SQL 查询与复合赋值合并(累加)
注意
重启和恢复后,ASA 可能会重新发送已发出的输出事件。 此行为可能导致累积逻辑失败(将单个值翻倍)。 若要防止此问题,请使用原生 ASA SQL 输出将相同的数据输出到表中。 可以使用此控制表来检测问题,并在必要时重新同步累积。
此选项使用 Microsoft.Data.SqlClient。 此库允许将任何 SQL 查询发送到 SQL 数据库。
此示例基于:
首先按照本教程的说明,创建一个默认 HttpTrigger 函数应用。 使用了以下信息:
- 语言:
C# - 运行时:
.NET 6(function/runtime v4 中) - 模板:
HTTP trigger
通过在位于项目文件夹的终端中运行以下命令来安装 SqlClient 库:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
在您的local.settings.json的Values部分中添加SqlConnectionString项,并填写目标服务器的连接字符串。
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
将整个函数(项目中的 .cs 文件)替换为以下代码片段。 自行更新命名空间、类名和函数名称:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
更新 sqltext 命令生成部分以匹配你自己的架构(请注意如何在更新时通过 += 运算符实现累加):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
现在,你可以通过调试(在 VS 代码中为 F5)来测试本地函数和数据库之间的连接。 需要能够从你的计算机上访问 SQL 数据库。 可以使用 SSMS 检查连接。 然后,将 POST 请求发送到本地终结点。 具有空正文的请求应返回 HTTP 204。 具有实际有效负载的请求应该在目标表中持久化(在累加/合并模式下)。 以下是与本样本中使用的模式相对应的样本有效负载:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
现在可以将函数发布到 Azure。 应为应用设置设置SqlConnectionString。 Azure SQL Server 防火墙应该允许 Azure 服务通过,这样实时函数才能到达。
然后可以将该函数定义为 ASA 作业中的输出,并用于替换记录而不是插入记录。
备选方法
在Azure Functions之外,多个方法可以实现预期的结果。 本部分介绍其中一些方法。
目标 SQL 数据库中的后处理
一旦通过标准的 ASA SQL 输出将数据插入到数据库中,后台任务就会开始工作。
对于 Azure SQL,请使用 INSTEAD OFDML 触发器来拦截 ASA 发出的 INSERT 命令。
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
对于 Synapse SQL,ASA 可以在临时表中插入。 然后,定期任务可根据需要将数据转换为中间表。 最后,数据被移动到生产表。
在 Azure Cosmos DB 中预处理
Azure Cosmos DB 本机支持 UPSERT。 此处,只能追加或替换。 您必须在客户方管理 Azure Cosmos DB 的累积。
如果要求匹配,则可以将目标 SQL 数据库替换为Azure Cosmos DB实例。 此更改需要在整体解决方案体系结构中发生重要更改。
对于 Synapse SQL,可以通过Azure Synapse Link for Azure Cosmos DB将 Azure Cosmos DB 用作中间层。 使用 Azure Synapse Link创建 analytical store。 然后,可以直接在 Synapse SQL 中查询此数据存储。
备选项的比较
每个方法提供不同的价值主张和功能:
| 类型 | 选项 | 模式 | Azure SQL 数据库 | Azure Synapse Analytics |
|---|---|---|---|---|
| 后处理 | ||||
| 触发器 | 替换、累加 | + | 不适用,触发器在 Synapse SQL 中不可用 | |
| 测试阶段 | 替换、累加 | + | + | |
| 预处理 | ||||
| Azure Functions | 替换、累加 | + | -(逐行模式下的性能) | |
| Azure Cosmos DB 替换 | 替换 | 空值 | 空值 | |
| Azure Cosmos DB Azure Synapse Link | 替换 | 空值 | + |
获取支持
如需进一步的帮助,请尝试 Azure 流分析的Microsoft问答页面。