pyspark.sql.functions.map_zip_with#
- pyspark.sql.functions.map_zip_with(col1, col2, f)[source]#
Collection: Merges two given maps into a single map by applying a function to the key-value pairs.
New in version 3.1.0.
Changed in version 3.4.0: Supports Spark Connect.
- Parameters
- col1
Column
or str The name of the first column or a column expression representing the first map.
- col2
Column
or str The name of the second column or a column expression representing the second map.
- ffunction
A ternary function
(k: Column, v1: Column, v2: Column) -> Column...
that defines how to merge the values from the two maps. This function should return a column that will be used as the value in the resulting map. Can use methods ofColumn
, functions defined inpyspark.sql.functions
and ScalaUserDefinedFunctions
. PythonUserDefinedFunctions
are not supported (SPARK-27052).
- col1
- Returns
Column
A new map column where each key-value pair is the result of applying the function to the corresponding key-value pairs in the input maps.
Examples
Example 1: Merging two maps with a simple function
>>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([ ... (1, {"A": 1, "B": 2}, {"A": 3, "B": 4})], ... ("id", "map1", "map2")) >>> row = df.select( ... sf.map_zip_with("map1", "map2", lambda _, v1, v2: v1 + v2).alias("updated_data") ... ).head() >>> sorted(row["updated_data"].items()) [('A', 4), ('B', 6)]
Example 2: Merging two maps with a complex function
>>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([ ... (1, {"A": 1, "B": 2}, {"A": 3, "B": 4})], ... ("id", "map1", "map2")) >>> row = df.select( ... sf.map_zip_with("map1", "map2", ... lambda k, v1, v2: sf.when(k == "A", v1 + v2).otherwise(v1 - v2) ... ).alias("updated_data") ... ).head() >>> sorted(row["updated_data"].items()) [('A', 4), ('B', -2)]
Example 3: Merging two maps with mismatched keys
>>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([ ... (1, {"A": 1, "B": 2}, {"B": 3, "C": 4})], ... ("id", "map1", "map2")) >>> row = df.select( ... sf.map_zip_with("map1", "map2", ... lambda _, v1, v2: sf.when(v2.isNull(), v1).otherwise(v1 + v2) ... ).alias("updated_data") ... ).head() >>> sorted(row["updated_data"].items()) [('A', 1), ('B', 5), ('C', None)]