Spark SQL Map functions – complete list

In this article, I will explain the usage of the Spark SQL map functions map()map_keys()map_values()map_contact()map_from_entries() on DataFrame column using Scala example.

Though I’ve explained here with Scala, a similar method could be used to work Spark SQL map functions with PySpark and if time permits I will cover it in the future. If you are looking for PySpark, I would still recommend reading through this article as it would give you an Idea on Spark map functions and its usage.

Spark SQL provides built-in standard map functions defines in DataFrame API, these come in handy when we need to make operations on map (MapType) columns. All these functions accept input as, map column and several other arguments based on the functions.

When possible try to leverage standard library as they are little bit more compile-time safety, handles null and perform better when compared to UDF’s. If your application is critical on performance try to avoid using custom UDF at all costs as these are not guarantee on performance.

Spark SQL map Functions

Spark SQL map functions are grouped as “collection_funcs” in spark SQL along with several array functions. These map functions are useful when we want to concatenate two or more map columns, convert arrays of StructType entries to map column e.t.c

map Creates a new map column.
map_keys Returns an array containing the keys of the map.
map_values Returns an array containing the values of the map.
map_concat Merges maps specified in arguments.
map_from_entries Returns a map from the given array of StructType entries.
map_entries Returns an array of all StructType in the given map.
explode(e: Column) Creates a new row for every key-value pair in the map by ignoring null & empty. It creates two new columns one for key and one for value.
explode_outer(e: Column) Creates a new row for every key-value pair in the map including null & empty. It creates two new columns one for key and one for value.
posexplode(e: Column) Creates a new row for each key-value pair in a map by ignoring null & empty. It also creates 3 columns “pos” to hold the position of the map element, “key” and “value” columns for every row.
posexplode_outer(e: Column) Creates a new row for each key-value pair in a map including null & empty. It also creates 3 columns “pos” to hold the position of the map element, “key” and “value” columns for every row.
transform_keys(expr: Column, f: (Column, Column) => Column) Transforms map by applying functions to every key-value pair and returns a transformed map.
transform_values(expr: Column, f: (Column, Column) => Column) Transforms map by applying functions to every key-value pair and returns a transformed map.
map_zip_with( left: Column, right: Column, f: (Column, Column, Column) => Column) Merges two maps into a single map.
element_at(column: Column, value: Any) Returns a value of a key in a map.
size(e: Column) Returns length of a map column.
Before we start, let’s create a DataFrame with some sample data to work with.

  val structureData = Seq(
    Row("36636","Finance",Row(3000,"USA")),
    Row("40288","Finance",Row(5000,"IND")),
    Row("42114","Sales",Row(3900,"USA")),
    Row("39192","Marketing",Row(2500,"CAN")),
    Row("34534","Sales",Row(6500,"USA"))
  )

  val structureSchema = new StructType()
    .add("id",StringType)
    .add("dept",StringType)
    .add("properties",new StructType()
      .add("salary",IntegerType)
      .add("location",StringType)
    )
  
  var df = spark.createDataFrame(
    spark.sparkContext.parallelize(structureData),structureSchema)
  df.printSchema()
  df.show(false)
Outputs below schemas and data.

root
 |-- id: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- salary: integer (nullable = true)
 |    |-- location: string (nullable = true)

+-----+---------+-----------+
|id   |dept     |properties |
+-----+---------+-----------+
|36636|Finance  |[3000, USA]|
|40288|Finance  |[5000, IND]|
|42114|Sales    |[3900, USA]|
|39192|Marketing|[2500, CAN]|
|34534|Sales    |[6500, USA]|
+-----+---------+-----------+

map() – Spark SQL map functions


Syntax -  map(cols: Column*): Column
org.apache.spark.sql.functions.map() SQL function is used to create a map column of MapType on DataFrame. The input columns to the map function must be grouped as key-value pairs. e.g. (key1, value1, key2, value2, …). Note: All key columns must have the same data type, and can’t be null and All value columns must have the same data type. Below snippet converts all columns from “properties” struct into map key, value pairs “propertiesmap” column.

  val index = df.schema.fieldIndex("properties")
  val propSchema = df.schema(index).dataType.asInstanceOf[StructType]
  var columns = mutable.LinkedHashSet[Column]()
  propSchema.fields.foreach(field =>{
    columns.add(lit(field.name))
    columns.add(col("properties." + field.name))
  })

  df = df.withColumn("propertiesMap",map(columns.toSeq:_*))
  df = df.drop("properties")
  df.printSchema()
  df.show(false)
First, we find “properties” column on Spark DataFrame using df.schema.fieldIndex(“properties”) and retrieves all columns and it’s values to a LinkedHashSet. we need LinkedHashSet in order to maintain the insertion order of key and value pair. and finally use map() function with a key, value set pair.

root
 |-- id: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- propertiesMap: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+-----+---------+---------------------------------+
|id   |dept     |propertiesMap                    |
+-----+---------+---------------------------------+
|36636|Finance  |[salary -> 3000, location -> USA]|
|40288|Finance  |[salary -> 5000, location -> IND]|
|42114|Sales    |[salary -> 3900, location -> USA]|
|39192|Marketing|[salary -> 2500, location -> CAN]|
|34534|Sales    |[salary -> 6500, location -> USA]|
+-----+---------+---------------------------------+

map_keys() – Returns map keys from a Spark SQL DataFrame


Syntax -  map_keys(e: Column): Column 
use map_keys() spark function in order to retrieve all keys from a Spark DataFrame MapType column. Note that map_keys takes an argument of MapType while passing any other type returns an error at run time.

df.select(col("id"),map_keys(col("propertiesMap"))).show(false)
Outputs all map keys from a Spark DataFrame

+-----+-----------------------+
|id   |map_keys(propertiesMap)|
+-----+-----------------------+
|36636|[salary, location]     |
|40288|[salary, location]     |
|42114|[salary, location]     |
|39192|[salary, location]     |
|34534|[salary, location]     |
+-----+-----------------------+

map_values() – Returns map values from a Spark DataFrame


Syntax -  map_values(e: Column): Column
use map_values() spark function in order to retrieve all values from a Spark DataFrame MapType column. Note that map_values takes an argument of MapType while passing any other type returns an error at run time.

  df.select(col("id"),map_values(col("propertiesMap")))
    .show(false)
Outputs following.

+-----+-------------------------+
|id   |map_values(propertiesMap)|
+-----+-------------------------+
|36636|[3000, USA]              |
|40288|[5000, IND]              |
|42114|[3900, USA]              |
|39192|[2500, CAN]              |
|34534|[6500, USA]              |
+-----+-------------------------+

map_concat() – Concatenating two or more maps on DataFrame


Syntax - map_concat(cols: Column*): Column
Use Spark SQL map_concat() function in order to concatenate keys and values from more than one map to a single map. All arguments to this function should be MapType, passing any other type results a run time error.

  val arrayStructureData = Seq(
    Row("James",List(Row("Newark","NY"),Row("Brooklyn","NY")),Map("hair"->"black","eye"->"brown"), Map("height"->"5.9")),
  Row("Michael",List(Row("SanJose","CA"),Row("Sandiago","CA")),Map("hair"->"brown","eye"->"black"),Map("height"->"6")),
  Row("Robert",List(Row("LasVegas","NV")),Map("hair"->"red","eye"->"gray"),Map("height"->"6.3")),
  Row("Maria",null,Map("hair"->"blond","eye"->"red"),Map("height"->"5.6")),
  Row("Jen",List(Row("LAX","CA"),Row("Orange","CA")),Map("white"->"black","eye"->"black"),Map("height"->"5.2"))
  )

  val arrayStructureSchema = new StructType()
    .add("name",StringType)
    .add("addresses", ArrayType(new StructType()
      .add("city",StringType)
      .add("state",StringType)))
    .add("properties", MapType(StringType,StringType))
    .add("secondProp", MapType(StringType,StringType))

  val concatDF = spark.createDataFrame(
    spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)

concatDF.withColumn("mapConcat",map_concat(col("properties"),col("secondProp")))
    .select("name","mapConcat")
    .show(false)
Output:

+-------+---------------------------------------------+
|name   |mapConcat                                    |
+-------+---------------------------------------------+
|James  |[hair -> black, eye -> brown, height -> 5.9] |
|Michael|[hair -> brown, eye -> black, height -> 6]   |
|Robert |[hair -> red, eye -> gray, height -> 6.3]    |
|Maria  |[hair -> blond, eye -> red, height -> 5.6]   |
|Jen    |[white -> black, eye -> black, height -> 5.2]|
+-------+---------------------------------------------+

map_from_entries() – convert array of StructType entries to map

Use map_from_entries() SQL functions to convert array of StructType entries to map (MapType) on Spark DataFrame. This function take DataFrame column ArrayType[StructType] as an argument, passing any other type results an error.

Syntax -  map_from_entries(e: Column): Column

concatDF.withColumn("mapFromEntries",map_from_entries(col("addresses")))
    .select("name","mapFromEntries")
    .show(false)
Output:

+-------+-------------------------------+
|name   |mapFromEntries                 |
+-------+-------------------------------+
|James  |[Newark -> NY, Brooklyn -> NY] |
|Michael|[SanJose -> CA, Sandiago -> CA]|
|Robert |[LasVegas -> NV]               |
|Maria  |null                           |
|Jen    |[LAX -> CA, Orange -> CA]      |
+-------+-------------------------------+

map_entries() – convert map of StructType to array of StructType


Syntax -  map_entries(e: Column): Column
Use Spark SQL map_entries() function to convert map of StructType to array of StructType column on DataFrame.

Complete Spark SQL map functions example


package com.sparkbyexamples.spark.dataframe.functions.collection

import org.apache.spark.sql.functions.{col, explode, lit, map, map_concat, map_from_entries, map_keys, map_values}
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType}
import org.apache.spark.sql.{Column, Row, SparkSession}

import scala.collection.mutable

object MapFunctions extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()
  import spark.implicits._

  val structureData = Seq(
    Row("36636","Finance",Row(3000,"USA")),
    Row("40288","Finance",Row(5000,"IND")),
    Row("42114","Sales",Row(3900,"USA")),
    Row("39192","Marketing",Row(2500,"CAN")),
    Row("34534","Sales",Row(6500,"USA"))
  )

  val structureSchema = new StructType()
    .add("id",StringType)
    .add("dept",StringType)
    .add("properties",new StructType()
      .add("salary",IntegerType)
      .add("location",StringType)
    )

  var df = spark.createDataFrame(
    spark.sparkContext.parallelize(structureData),structureSchema)
  df.printSchema()
  df.show(false)

  // Convert to Map
  val index = df.schema.fieldIndex("properties")
  val propSchema = df.schema(index).dataType.asInstanceOf[StructType]
  var columns = mutable.LinkedHashSet[Column]()
  propSchema.fields.foreach(field =>{
    columns.add(lit(field.name))
    columns.add(col("properties." + field.name))
  })

  df = df.withColumn("propertiesMap",map(columns.toSeq:_*))
  df = df.drop("properties")
  df.printSchema()
  df.show(false)

  //Retrieve all keys from a Map
  val keys = df.select(explode(map_keys($"propertiesMap"))).as[String].distinct.collect
  print(keys.mkString(","))

  // map_keys
  df.select(col("id"),map_keys(col("propertiesMap")))
    .show(false)

  //map_values
  df.select(col("id"),map_values(col("propertiesMap")))
    .show(false)

  //Creating DF with MapType
  val arrayStructureData = Seq(
    Row("James",List(Row("Newark","NY"),Row("Brooklyn","NY")),Map("hair"->"black","eye"->"brown"), Map("height"->"5.9")),
  Row("Michael",List(Row("SanJose","CA"),Row("Sandiago","CA")),Map("hair"->"brown","eye"->"black"),Map("height"->"6")),
  Row("Robert",List(Row("LasVegas","NV")),Map("hair"->"red","eye"->"gray"),Map("height"->"6.3")),
  Row("Maria",null,Map("hair"->"blond","eye"->"red"),Map("height"->"5.6")),
  Row("Jen",List(Row("LAX","CA"),Row("Orange","CA")),Map("white"->"black","eye"->"black"),Map("height"->"5.2"))
  )

  val arrayStructureSchema = new StructType()
    .add("name",StringType)
    .add("addresses", ArrayType(new StructType()
      .add("city",StringType)
      .add("state",StringType)))
    .add("properties", MapType(StringType,StringType))
    .add("secondProp", MapType(StringType,StringType))

  val concatDF = spark.createDataFrame(
    spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
  concatDF.printSchema()
  concatDF.show()

  concatDF.withColumn("mapConcat",map_concat(col("properties"),col("secondProp")))
    .select("name","mapConcat")
    .show(false)

  concatDF.withColumn("mapFromEntries",map_from_entries(col("addresses")))
    .select("name","mapFromEntries")
    .show(false)
}

Conclusion

 In this article, you have learned how to convert an array of StructType to map and Map of StructType to array and concatenating several maps using SQL map functions on the Spark DataFrame column.

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

Leave a Reply

Close Menu