使用在 pandas DataFrame 上作为输入和输出执行的Python本机函数映射当前数据帧中的批处理迭代器,并将结果作为数据帧返回。
Syntax
mapInPandas(func: "PandasMapIterFunction", schema: Union[StructType, str], barrier: bool = False, profile: Optional[ResourceProfile] = None)
参数
| 参数 | 类型 | 说明 |
|---|---|---|
func |
函数 | 一个Python本机函数,它采用 pandas.DataFrame 的迭代器,并输出 pandas.DataFrame 的迭代器。 |
schema |
DataType 或 str | PySpark 中的返回类型 func 。 该值可以是对象 pyspark.sql.types.DataType 或 DDL 格式的类型字符串。 |
barrier |
bool、optional、default False | 使用屏障模式执行,确保阶段中的所有Python辅助角色将同时启动。 |
profile |
ResourceProfile,可选 | 要用于 mapInPandas 的可选 ResourceProfile。 |
退货
DataFrame
示例
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
def mean_age(iterator):
for pdf in iterator:
yield pdf.groupby("id").mean().reset_index()
df.mapInPandas(mean_age, "id: bigint, age: double").show()
# +---+----+
# | id| age|
# +---+----+
# | 1|21.0|
# | 2|30.0|
# +---+----+
df.mapInPandas(filter_func, df.schema, barrier=True).collect()
# [Row(id=1, age=21)]