阶段 2:Spark 工作负荷迁移

本文是 Azure Synapse Spark 到 Microsoft Fabric 迁移最佳实践系列的第二阶段,共四个阶段。

使用本文将 Spark 工作负载从Azure Synapse迁移到Microsoft Fabric。 本文介绍如何运行迁移助手、重构无法自动转换的代码模式以及迁移 Spark 池配置、环境和库。

在这篇文章中,你将学会如何:

  • 了解标准(非 Git)和已启用 Git 的 Synapse 工作区的迁移工作流。
  • 使用 Spark 迁移助手 来迁移笔记本、Spark 作业定义以及资源池。
  • 重构特定于 Synapse 的代码模式,以适配 Fabric 兼容性。
  • 迁移 Spark 池设置、环境和库。
  • 确定并解决 Synapse 与 Fabric之间的库兼容性差距。

使用迁移助手进行迁移

Spark 迁移助手自动将笔记本、Spark 作业定义、池和 Lake 数据库元数据从 Synapse 迁移到 Fabric。 助手复制并转换项目,但未完成迁移-你仍需要重构代码、协调配置差距并验证结果。

有关运行助手的分步说明,请参阅 Spark Synapse 到 Fabric Spark 迁移助手(预览版)

助手迁移以下项:

  • Spark 池将迁移到 Fabric Pools 和对应的环境工件。
  • 笔记本及其关联的环境被迁移。
  • Spark 作业定义与关联的环境一起迁移。
  • Lake 数据库映射到 Fabric 架构,托管的 Delta 表通过 OneLake 目录快捷方式进行迁移。

重要

助手不会迁移 Spark 配置、自定义库和自定义执行程序设置。 必须在 Fabric 环境中手动配置这些配置。 无法使用助手迁移 VNet 下的 Synapse 工作区。

标准(非 Git)工作区迁移

对于笔记本和 SDK 直接存储在 Synapse 中的工作区(而不是 Git 存储库中):

  1. 从 Fabric 工作区运行 Spark 迁移助手(Migrate>数据工程项目)。 选择源 Synapse 工作区并迁移所有 Spark 项。

  2. 验证依赖项:确保使用相同的 Spark 版本。 如果笔记本通过 mssparkutils.notebook.run() 引用其他笔记本,请确认这些笔记本也已迁移。 迁移助手保留文件夹结构(Fabric最多支持 10 个嵌套级别)。

  3. 重构代码:将 mssparkutils 替换为 notebookutils,将链接服务引用替换为Fabric连接,以及更新文件路径。 有关详细信息,请参阅 “重构 Spark 代码 ”部分。

已启用 Git 的工作区迁移

对于笔记本和 SJD 存储在 Azure DevOps 或 GitHub 存储库中的工作区,请注意 Synapse 和 Fabric 使用不同的 Git 序列化格式。 Synapse 将笔记本存储为 JSON;Fabric使用源格式 .py/.scala.ipynb。 不能将Fabric工作区直接指向同一 Synapse Git 分支。

  1. 迁移项。 使用 Spark 迁移助手将笔记本和 SSD 从 Synapse 工作区迁移到Fabric工作区。 这会将项转换为与Fabric兼容的格式。

  2. 重构代码。 应用与标准方案相同的代码重构 - 替换 mssparkutils、更新文件路径、替换链接服务。 有关详细信息,请参阅 “重构 Spark 代码 ”部分。

  3. 将Fabric工作区连接到 Git。 将Fabric工作区连接到存储库中的新分支或文件夹(Workspace Settings>Source Control>Git Integration)。 使用 Synapse 内容中的单独分支或文件夹来避免冲突。 提交Fabric工作区内容以填充新分支。

  4. 设置部署管道(可选)。 为正在进行的 CI/CD 配置 Fabric 部署管道(开发→测试→生产)。 Fabric支持在跨阶段部署时自动绑定默认的数据湖仓和附加环境。

小窍门

将 Synapse Git 分支保留为历史参考。 为Fabric内容创建新分支或文件夹。 Fabric将笔记本存储为源文件(.py for PySpark),而不是 JSON,这为代码评审提供了更清晰的 Git 差异。

重构 Spark 代码

迁移笔记本和 Spark 作业定义后,需要修复迁移助手无法自动转换的代码模式。 本部分将指导你替换特定于 Synapse 的 API、更新文件路径以及更改凭据模式以使用Fabric。

重构前审核

在解决各个重构模式之前,请在所有笔记本中运行整个代码库的搜索,以识别需要更改的 Synapse 特有的代码。

搜索模式 Category 需要采取行动
spark.synapse.linkedService 链接服务 删除;替换为直接端点身份验证或密钥保管库机密信息
getSecretWithLS Credentials 替换为 getSecret(vaultUrl, secretName)
TokenLibrary 令牌/身份验证 删除;使用直接 OAuth 配置或 notebookutils
synapsesql SQL 连接器 spark.read.synapsesql() 替换为 Delta 格式读取
mssparkutils Spark Utils 替换为 notebookutils (大多数 API 相同)
spark.catalog.listDatabases 目录 API 替换为 spark.sql("SHOW DATABASES")
spark.catalog.currentDatabase 目录 API 替换为 spark.sql("SELECT CURRENT_DATABASE()")
spark.catalog.getDatabase 目录 API 替换为 spark.sql("DESCRIBE DATABASE ...")
spark.catalog.listFunctions 目录 API Fabric不支持 - 删除
spark.catalog.registerFunction 目录 API 不支持 - 请改用spark.udf.register()
spark.catalog.functionExists 目录 API 在Fabric中不支持 — 移除
LinkedServiceBasedTokenProvider 身份验证提供程序 替换为 ClientCredsTokenProvider
getPropertiesAsMap 链接服务 删除;直接配置存储帐户
spark.storage.synapse 链接服务 删除 - Fabric不支持
/user/trusted-service-user/ 文件路径 替换为 OneLake 路径或快捷路径
cosmos.oltp Cosmos DB 更新以将密钥保管库用于机密而不是链接服务
kusto.spark.synapse Kusto/ADX 将链接的服务身份验证替换为accessToken通过getToken()

小窍门

迁移之前,请在整个笔记本存储库中运行这些搜索。 具有零匹配项的笔记本可以原样安全地迁移。 应使用以下部分中的详细指南,优先使用匹配项的笔记本进行代码重构。

文件路径使用情况

更新使用相对路径或 Synapse 托管存储路径的 Synapse 笔记本,以在 Fabric 中使用直接abfss://路径或 OneLake 路径。

之前 (Synapse) After (Fabric)
"abfss://...@<synapse_storage>.dfs.core.windows.net/user/trusted-service-user/deltalake" "abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>/Tables/deltalake"
spark.read.synapsesql("<pool>.<schema>.<table>") spark.read.format("delta").load("abfss://.../<lakehouse>/Tables/<table>")

小窍门

将所有 Synapse 托管的存储路径替换为 OneLake 路径 (abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<item_id>/...)。 对于 ADLS Gen2 数据,请创建 OneLake 快捷方式,并使用快捷方式路径来替代。

Spark 目录 API

Fabric不支持多个 spark.catalog 方法。 将它们替换为 Spark SQL 等效项。

之前 (Synapse) After (Fabric)
spark.catalog.listDatabases() spark.sql("SHOW DATABASES").show()
spark.catalog.currentDatabase() spark.sql("SELECT CURRENT_DATABASE()").first()["current_database()"]
spark.catalog.getDatabase(db_name) spark.sql(f"DESCRIBE DATABASE {db_name}").show()
spark.catalog.listFunctions() Fabric 中不支持,删除或跳过
spark.catalog.registerFunction(name, fn) Fabric不支持,请使用spark.udf.register()代替
spark.catalog.functionExists(name) Fabric不支持,请删除或跳过

注释

spark.catalog表方法(如 createTable()tableExists()listTables())在Fabric中正常工作。 只有数据库级和函数级目录方法需要重构。

MSSparkUtils 和 NotebookUtils

mssparkutils 调用替换为 Fabric notebookutils 等效项。 最常见的凭据相关更改包括:

之前(Synapse) After (Fabric)
mssparkutils.credentials.getSecretWithLS("sampleLS", secretKey) notebookutils.credentials.getSecret("https://<vault>.vault.azure.net/", secretKey)
TokenLibrary.getSecret("foo", "bar") notebookutils.credentials.getSecret("https://foo.vault.azure.net/", "bar")

在Fabric中,不支持基于服务的链接机密检索(getSecretWithLS)。 而是使用 notebookutils.credentials.getSecret(vaultUrl, secretName) 直接引用密钥保管库 URL。 相同的模式适用于 TokenLibrary.getSecret() 调用。

注释

大多数 mssparkutils.fs 方法(例如,lscpmvrmmkdirshead)的工作方式与 Fabric 中的 notebookutils.fs 相同。 主要更改是凭据和机密方法,以及 notebook.run() 路径引用。

Azure 数据资源管理器 (Kusto) 连接器

必须重构通过链接服务连接到 Azure 数据资源管理器(Kusto)的 Synapse 笔记本,以便使用直接终结点身份验证。

之前(Synapse) 在(Fabric)之后
.option("spark.synapse.linkedService", "AzureDataExplorer1") 删除链接服务引用
使用链接服务选项集进行读取 .option("accessToken", notebookutils.credentials.getToken("https://<cluster>.kusto.windows.net"))

将链接服务选项替换为一个选项 accessToken 。 使用 notebookutils.credentials.getToken() 以获取 Kusto 集群端点的令牌。 其余查询选项(kustoDatabase,) kustoQuery保持不变。

Cosmos DB 连接器

在 Synapse 中更新使用的链接服务或 getSecretWithLS 的 Cosmos DB 连接。

之前(Synapse) After (Fabric)
.option("spark.synapse.linkedService", "CosmosDbLS") 删除链接服务引用
mssparkutils.credentials.getSecretWithLS("cosmosKeyLS", "cosmosKey") notebookutils.credentials.getSecret("https://<vault>.vault.azure.net/", "cosmosKey")

将链接服务引用替换为直接 Cosmos DB 终结点配置。 将 Cosmos DB 帐户密钥存储在Azure 密钥保管库中,并使用 notebookutils.credentials.getSecret(vaultUrl, secretName) 而不是 getSecretWithLS()进行检索。

关联服务引用

替换 Fabric 中的所有 Synapse 关联服务引用。

之前(Synapse) After (Fabric)
spark.conf.set("spark.storage.synapse.linkedServiceName", ls_name) 删除 - Fabric不支持
spark.conf.set("fs.azure.account.oauth.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider") spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
TokenLibrary.getPropertiesAsMap(linked_service_cfg) 删除:使用(直接连接字符串或服务主体配置)

在Fabric中,没有链接服务。 将 Synapse 令牌管理器替换为标准 OAuth 客户端凭据(服务主体)。 使用spark.conf.set()直接配置fs.azure.account.auth.typeoauth.provider.typeclient.idclient.secretclient.endpoint

令牌库

Synapse 用于获取令牌和读取链接服务属性的 TokenLibrary在Fabric中不可用。 将其替换为等效模式。

之前 (Synapse) After (Fabric)
TokenLibrary.getPropertiesAsMap(serviceConnection) 删除 - 直接配置存储帐户
val my_account = conexion("Endpoint").toString.substring(8) val my_account = "<storage_account_name>" // Hardcode or retrieve via notebookutils
mssparkutils.fs.head(internalPath, Int.MaxValue) notebookutils.fs.head(internalPath, Int.MaxValue)

对于基于 OAuth 的 ADLS Gen2 访问,请通过使用服务主体凭据直接配置,与存储帐户相关的特定密钥(例如,spark.conf.set()fs.azure.account.auth.type.<account>.dfs.core.windows.net),而不是依赖于关联服务的令牌提供者。

重要

在切换之前检查所有笔记本中的链接服务引用。 在Fabric中,任何剩余的spark.synapse.linkedServiceTokenLibrarygetSecretWithLS调用在运行时都会失败。

Spark 作业定义迁移

Spark 作业定义(SJD)是批作业配置,用来引用主可执行文件(.py.jar、或.R),可选的引用库、命令行参数和 lakehouse 环境。 虽然 Spark 迁移助手自动处理 SJD 迁移,但 Synapse 和 Fabric SJD 之间的重要差异需要注意。

Synapse 和 Fabric SJD 之间的主要区别

  • 需要 Lakehouse 上下文信息。 在Fabric中,每个 SJD 必须至少有一个与之关联的湖屋。 此 lakehouse 充当 Spark 运行时的默认文件系统。 使用相对路径的任何代码将从默认的 Lakehouse 进行读取和写入。 在 Synapse 中,SJD 使用工作区默认存储(ADLS Gen2)作为默认文件系统。

  • 支持的语言。 Fabric支持 PySpark(Python)、Spark(Scala/Java)和 SparkR。 Fabric不支持 .NET for Spark(C#/F#)。 在迁移之前,必须在 Python 或 Scala 中重写这些工作负荷。

  • 重试策略。 Fabric SJD 支持内置重试策略,例如最大重试次数和重试间隔。 此功能适用于需要无限期运行的 Spark 结构化流式处理作业。

  • 环境绑定。 在 Synapse 中,SJD 绑定到 Spark 池。 在Fabric中,SJD 绑定到包含池配置、库和 Spark 属性的环境。 迁移助手会自动将 Synapse 池引用映射到 Fabric 环境。

  • 计划。 Fabric SJD 具有内置调度功能(Settings>Schedule),无需单独的管道。 在 Synapse 中,SJD 调度需要具有 Spark 作业活动的管道。 如果 Synapse 管道只用于触发 SJD,请考虑使用 Fabric 的内置 SJD 调度,而不是进行迁移管道。

  • 导入/导出。 Synapse 支持 SJD 的基于 UI 的 JSON 导入和导出。 Fabric不支持 UI 导入或导出。 使用 Spark 迁移助手 或 Fabric REST API 编程创建或更新 SJD。

重构 SJD 代码

本文中的相同代码重构模式适用于 SJD 主文件。 更改分为两个类别。

源代码更改(在.py.jar.R主文件内部):

  • 使用notebookutils替换mssparkutils以进行凭据和文件系统操作。
  • 根据需要,将代码中的硬编码文件路径更新为 OneLake abfss:// 路径或快捷路径。 仅对默认数据湖库使用相对路径的 SJD 可能不需要更改。
  • 将代码中的链接服务引用替换为 密钥保管库 密钥或 Fabric 连接。

注释

Fabric Spark 作业定义中尚不支持 DMTS 连接(仅在笔记本中受支持)。 如果 SJD 代码使用 DMTS,则重构为使用直接终结点身份验证。

SJD 配置更改(在 Fabric SJD 项目设置中):

  • 验证主定义文件引用的 ADLS Gen2 路径是否仍可从Fabric工作区访问。 如果文件存储在 Synapse 工作区内部存储中,请将其重新上传到 Fabric SJD,或将它们移动到可访问的 ADLS Gen2 位置。
  • 验证迁移后可访问所有引用文件(.py.R.jar)。 重新上传存储在 Synapse 工作区内部存储中的任何文件。
  • 如果命令行参数包含特定于 Synapse 的路径或连接字符串,请将其更新为Fabric等效项。

迁移池、环境和库

在完成笔记本和 Spark 作业定义的迁移之后,您需要确定池和环境策略。 本部分介绍何时可以使用Fabric初学者池(而不是迁移),何时创建自定义环境,以及如何识别和解决库兼容性差距。

Spark 池迁移

Fabric初学者池

Fabric Starter 池提供秒级 Spark 会话启动时间,这是对需要数分钟冷启动才能启动群集的 Synapse Spark 池的显著改进。 初学者池可从平台使用,无需配置。

小窍门

如果 Synapse Spark 池没有自定义配置,则没有自定义库,并且没有超出“中等”的特定节点大小要求,则不要迁移池。 相反,让笔记本和 Spark 作业定义使用 Fabric 工作区默认 Starter 池设置。 此方法提供最快的启动时间和零池管理开销。 仅当有特定需求时,才创建自定义池或环境。

何时创建自定义池或环境

仅当工作负荷需要时,才创建Fabric自定义池和/或环境:

  • 特定节点大小(Small、Large、XLarge、XXLarge)不同于默认的 Medium。
  • 不在 Fabric 内置运行时中的自定义程序库(pip 包、conda 包、JAR、wheel)。
  • 超出默认值的自定义 Spark 属性(例如 spark.sql.shuffle.partitionsspark.executor.memory)。
  • 用于访问私有数据源的托管专用终端(需要自定义池)。
  • 与工作区默认值不同的特定 Spark 运行时版本。

配置和库迁移

将 Spark 配置和库迁移到Fabric环境。

有关将库迁移到 Fabric 环境的详细步骤,请参阅 将 Spark 库从 Azure Synapse 迁移到 Fabric

  1. 导出 Spark 配置。 在 Synapse Studio 中,转到 Manage>Spark Pools> 选择池>Configurations + Libraries> 下载为 .yml/.conf/.json

  2. 导入到环境。 在 Fabric 中,创建环境构件。 转到 Spark 计算>Spark 属性>,上传已导出的文件。

  3. 迁移库文件。 对于资源池级库,将包(Wheel、JAR、Tar)上传到环境的库部分。 对于 PyPI/Conda 包,请将它们添加到环境的公共库配置中。

重要

Fabric中的工作区级别的库设置已经被弃用。 将所有库迁移到环境工件。 迁移会永久删除现有的工作区级配置 - 在启用环境之前下载所有设置。

库兼容性:Synapse 与 Fabric

Fabric Runtime 1.3(Spark 3.5)已预装223个Python库、183个Java/Scala库和135个R库。 大多数 Synapse 库在 Fabric 中可用,但如果迁移前未解决,可能会导致运行时故障。

若要确定笔记本实际使用的库,请在查看差异表之前运行以下检查:

  • Python notebooks: 搜索所有 文件中的 语句。
  • Java/Scala 笔记本和 SJDs: 搜索 import 语句和 Maven 坐标,查找包,如 com.azure.cosmos.sparkcom.microsoft.kusto.spark
  • 导出完整依赖项列表:在 Synapse 笔记本中运行 pip freeze,并与 Fabric Runtime 1.3 的清单进行比较。 只有同时出现在你的 pip freeze 输出和下面的间隙表中的库才需要进行操作。
  • 池级和工作区级自定义库: 在 Synapse Studio 中,转到 Manage>Apache Spark Pools> 选择池>Packages以查看需要重新加载到Fabric环境的自定义库。

Fabric中的Python库缺失

Category 图书馆 操作
CUDA / GPU (9 个库) libcublas、libcufft、libcufile、libcurand、libcusolver、libcusparse、libnpp、libnvfatbin、libnvjitlink、libnvjpeg 不可用 - Fabric不支持 GPU 池。 重构 GPU 工作负载以使用基于 CPU 的替代项或保留在 Synapse 上。
HTTP/API 客户端 httpx, httpcore, h11, google-auth, jmespath 通过环境安装: pip install httpx google-auth jmespath
ML/可解释性 无解释必要,解释核心 通过环境安装: pip install interpret
数据序列化 marshmallow, jsonpickle, frozendict, fixedint 如果需要,请通过环境进行安装: pip install marshmallow jsonpickle
日志记录/遥测 fluent-logger, humanfriendly, library-metadata-cooker,impulse-python-handler fluent-logger:如果使用,请安装。 其他是 Synapse 内部的,可能不需要。
Jupyter 内部 jupyter-client、jupyter-core、jupyter-ui-poll、jupyterlab-widgets、ipython-pygments-lexers Fabric内部管理 Jupyter 基础结构。 用户代码中通常不需要这些库。
系统/C 库 libgcc、libstdcxx、libgrpc、libabseil、libexpat、libnsl、libzlib 低级别系统库。 通常不直接导入。 仅当有依赖的 C 扩展时安装。
文件/并发 filelock、fsspec、knack 如果使用,请通过安装环境进行安装:pip install filelock fsspec

Fabric 缺少 Java/Scala 库

Library Synapse 版本 操作
azure-cosmos-analytics-spark 2.2.5 如果您的 Spark 作业使用 Cosmos DB 分析连接器,请在 Fabric 环境中将其安装为自定义 JAR。
junit-jupiter-params 5.5.2 仅限测试的库。 生产笔记本中不需要。
junit-platform-commons 1.5.2 仅限测试的库。 生产笔记本中不需要。

R 软件库

只有一个区别:Synapse 包括 lightgbm R 包(v4.6.0),该包不在Fabric中。 如果需要,请通过环境进行安装。 Fabric引入FabricTelemetry(v1.0.2),这是Fabric的内部组件。

值得注意的版本差异

两个平台上都存在 68 个Python库,但版本不同。 大多数是次要版本差异,但 17 个主要版本跳转可能会影响行为。

Library Fabric 版本 Synapse 版本 影响
libxgboost 2.0.3 3.0.1 v2 和 v3 之间的 XGBoost API 更改。 测试模型训练/预测代码。
Flask 2.2.5 3.0.3 Flask 3.x 具有重大更改。 如果从笔记本提供 Flask API,请全面测试。
lxml 4.9.3 5.3.0 次要 API 更改。 测试 XML 分析工作流。
libprotobuf 3.20.3 4.25.3 Protobuf 4.x 对自定义 proto 定义进行了重大更改。
markupsafe 2.1.3 3.0.2 MarkupSafe 3.x 删除Python 3.7 支持,但 API 兼容。
libpq 12.17 17.4 PostgreSQL 客户端库。 主要版本跳转 - 测试数据库连接。
libgcc-ng / libstdcxx-ng 11.2.0 15.2.0 GCC 运行时。 可能会影响 C 扩展兼容性。

注释

Synapse 通常提供较新版本的系统级库(GCC、protobuf、libpq),而Fabric提供较新版本的数据/ML 库(整体Python包)。 如果需要特定版本,请在Fabric环境配置中固定它。

小窍门

运行快速兼容性检查:导出 Synapse 池的库列表(pip freeze),与 Fabric Runtime 1.3 清单进行比较,并在运行迁移的笔记本之前预安装Fabric环境中缺少的库。 有关 Fabric 和 Synapse Spark 运行时之间每个内置库和版本的逐行比较,请参阅 microsoft/synapse-spark-runtime GitHub 存储库