Python UDF、Scala UDF 以及原生執行引擎中的複雜資料型態

Microsoft Fabric 的原生執行引擎現已支援 Python 使用者定義函式(UDF)、Scala UDF 以及複雜的資料型別(陣列、映射與結構體)。 這些功能讓你能撰寫具表現力的 Spark 應用程式,同時不犧牲效能。

Python UDF 支援

Python 是資料工程和資料科學中最受歡迎的語言之一。 歷史上,Python UDF 因 JVM 與 Python 工作程序之間的序列化成本,對 Spark 造成了相當大的開銷。 原生執行引擎將這些昂貴的轉換降到最低,使得在不需修改程式碼的情況下也能更快執行。

Python UDFs 如何在原生執行引擎中運作

在傳統的 Spark 執行模型中,Python UDF 的執行包含:

  1. 從 Spark 的內部格式進行資料轉換。
  2. 序列化並轉移到 Python 工作程序。
  3. Python UDF 的執行。
  4. 將結果序列化回寫至 JVM。
  5. Spark 繼續執行。

這種跨執行時的移動會造成序列化/反序列化成本、CPU 效率低下以及欄位執行管線失效。 原生執行引擎透過優化資料傳輸路徑並在可能的情況下維持向量化處理,來降低這些開銷。

支援的 Python UDF 類型

原生執行引擎支援:

  • 純量 UDF:以 udf() 註冊的逐列處理 Python 函數。
  • 向量化(Pandas)UDF:以 @pandas_udf 裝飾的函式,使用 Apache Arrow 對一批批資料進行操作,以便高效傳輸。

向量化 UDF 的效能提升最大,因為它們自然地與原生執行引擎的列式處理模型對齊。

範例:向量化 Python UDF

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

@pandas_udf(DoubleType())
def calculate_discount(price: pd.Series, rate: pd.Series) -> pd.Series:
    return price * (1 - rate)

df = spark.table("sales.transactions")
result = df.withColumn("discounted_price", calculate_discount(df.price, df.discount_rate))
result.show()

除了啟用原生執行引擎外,不需要額外設定。 現有的 Python UDF 會自動受益。

Scala UDF 支援

原生執行引擎也能加速 Scala UDF。 由於 Scala UDF 以原生方式在 JVM 上執行,因此引擎可將支援的作業卸載至向量化的 C++ 執行路徑,同時讓 Scala UDF 在相同執行階段中的評估維持高效率。

範例:Scala UDF

import org.apache.spark.sql.functions.udf

val toUpperCase = udf((s: String) => s.toUpperCase)
val df = spark.table("catalog.customers")
val result = df.withColumn("name_upper", toUpperCase(df("name")))
result.show()

當啟用原生執行引擎時,對支援資料型態運作的 Scala UDF 可無需修改程式碼即可加速。

複雜資料型態支援

現代湖屋架構依賴半結構化與巢狀資料。 原生執行引擎現在提供以下最佳化支援:

資料類型 Description 使用案例範例
陣列 元素的有序集合 事件標籤、產品類別
地圖 鍵值對 組態屬性與元資料
結構體 不同類型的命名欄位 巢狀客戶紀錄、位址物件

複雜類型支援的操作

原生執行引擎加速複雜資料型態的常見操作:

  • 陣列函數:explodearray_containssizeflattentransform
  • 映射函式:map_keysmap_valueselement_at
  • 結構體存取:點記號欄位存取,getField
  • 巢狀組合:結構體陣列、帶有陣列值的映射

範例:使用陣列與結構

from pyspark.sql.functions import explode, col, size

# Read data with nested schema
df = spark.table("events.telemetry")

# Operations on arrays - accelerated by native engine
result = (df
    .filter(size(col("tags")) > 0)
    .select(
        col("event_id"),
        col("metadata.source"),  # Struct field access
        explode(col("tags")).alias("tag")
    )
)
result.show()

範例:使用地圖

from pyspark.sql.functions import map_keys, map_values, col

df = spark.table("config.settings")

# Map operations - accelerated by native engine
result = (df
    .select(
        col("setting_id"),
        map_keys(col("properties")).alias("keys"),
        map_values(col("properties")).alias("values")
    )
)
result.show()

表現成績

內部基準測試結果顯示,在使用 Python UDF 和複雜資料類型的各類工作負載中,皆有顯著改善:

工作負載類型 效能提升
向量化 Python UDF 速度可達 5.76 倍
標量 Python UDF 速度可達 1.08 倍
TPC-DS 端對端(含複雜型別) 速度可達 2.35 倍

這些效能提升來自序列化額外負荷的降低、向量化的改善,以及端對端的欄式執行。

進階湖倉模式的優點

複雜資料型態加速對於以下情況尤其重要:

  • Z 順序優化:巢狀欄位參與優化資料佈局。
  • 液態分群:複雜類型欄位可受益於分群,無須扁平化。
  • 半結構化分析:JSON payload 與事件串流保持巢狀,方便自然查詢。
  • 事件驅動架構:遙測與物聯網資料保留其階層結構。

與其為了效能而平整資料或重組管線,不如自然地處理複雜的結構,同時保持高執行效率。

啟用該功能

啟用原生執行引擎時,支援 Python UDF、Scala UDF 及複雜資料型態。 不需要任何其他設定。

要啟用原生執行引擎,請參見 Fabric Data Engineering 的原生執行引擎

先決條件

Limitations

  • 並非所有 Python 函式庫都支援向量化路徑。 需要任意 Python 物件序列化的函式庫仍可能觸發備援。
  • 深度巢狀的複雜型態(例如結構的映射陣列)可能會在某些操作時回退到 JVM 引擎。
  • ANSI 模式不支援原生執行引擎。