pyspark.sql.functions.map_zip_with#

pyspark.sql.functions.map_zip_with(col1, col2, f)[source]#

Collection: Merges two given maps into a single map by applying a function to the key-value pairs.

New in version 3.1.0.

Changed in version 3.4.0: Supports Spark Connect.

Parameters
col1Column or str

The name of the first column or a column expression representing the first map.

col2Column or str

The name of the second column or a column expression representing the second map.

ffunction

A ternary function (k: Column, v1: Column, v2: Column) -> Column... that defines how to merge the values from the two maps. This function should return a column that will be used as the value in the resulting map. Can use methods of Column, functions defined in pyspark.sql.functions and Scala UserDefinedFunctions. Python UserDefinedFunctions are not supported (SPARK-27052).

Returns
Column

A new map column where each key-value pair is the result of applying the function to the corresponding key-value pairs in the input maps.

Examples

Example 1: Merging two maps with a simple function

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([
...   (1, {"A": 1, "B": 2}, {"A": 3, "B": 4})],
...   ("id", "map1", "map2"))
>>> row = df.select(
...   sf.map_zip_with("map1", "map2", lambda _, v1, v2: v1 + v2).alias("updated_data")
... ).head()
>>> sorted(row["updated_data"].items())
[('A', 4), ('B', 6)]

Example 2: Merging two maps with a complex function

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([
...   (1, {"A": 1, "B": 2}, {"A": 3, "B": 4})],
...   ("id", "map1", "map2"))
>>> row = df.select(
...   sf.map_zip_with("map1", "map2",
...     lambda k, v1, v2: sf.when(k == "A", v1 + v2).otherwise(v1 - v2)
...   ).alias("updated_data")
... ).head()
>>> sorted(row["updated_data"].items())
[('A', 4), ('B', -2)]

Example 3: Merging two maps with mismatched keys

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([
...   (1, {"A": 1, "B": 2}, {"B": 3, "C": 4})],
...   ("id", "map1", "map2"))
>>> row = df.select(
...   sf.map_zip_with("map1", "map2",
...     lambda _, v1, v2: sf.when(v2.isNull(), v1).otherwise(v1 + v2)
...   ).alias("updated_data")
... ).head()
>>> sorted(row["updated_data"].items())
[('A', 1), ('B', 5), ('C', None)]