In this article, I will explain the usage of the Spark SQL map functions map(), map_keys(), map_values(), map_contact(), map_from_entries() on DataFrame column using Scala example.
Though I’ve explained here with Scala, a similar method could be used to work Spark SQL map functions with PySpark and if time permits I will cover it in the future. If you are looking for PySpark, I would still recommend reading through this article as it would give you an Idea on Spark map functions and its usage.
Spark SQL provides built-in standard map functions defines in DataFrame API, these come in handy when we need to make operations on map (MapType) columns. All these functions accept input as, map column and several other arguments based on the functions.
When possible try to leverage standard library as they are little bit more compile-time safety, handles null and perform better when compared to UDF’s. If your application is critical on performance try to avoid using custom UDF at all costs as these are not guarantee on performance.
Spark SQL map Functions
Spark SQL map functions are grouped as “collection_funcs” in spark SQL along with several array functions. These map functions are useful when we want to concatenate two or more map columns, convert arrays of StructType entries to map column e.t.c
map | Creates a new map column. |
map_keys | Returns an array containing the keys of the map. |
map_values | Returns an array containing the values of the map. |
map_concat | Merges maps specified in arguments. |
map_from_entries | Returns a map from the given array of StructType entries. |
map_entries | Returns an array of all StructType in the given map. |
explode(e: Column) | Creates a new row for every key-value pair in the map by ignoring null & empty. It creates two new columns one for key and one for value. |
explode_outer(e: Column) | Creates a new row for every key-value pair in the map including null & empty. It creates two new columns one for key and one for value. |
posexplode(e: Column) | Creates a new row for each key-value pair in a map by ignoring null & empty. It also creates 3 columns “pos” to hold the position of the map element, “key” and “value” columns for every row. |
posexplode_outer(e: Column) | Creates a new row for each key-value pair in a map including null & empty. It also creates 3 columns “pos” to hold the position of the map element, “key” and “value” columns for every row. |
transform_keys(expr: Column, f: (Column, Column) => Column) | Transforms map by applying functions to every key-value pair and returns a transformed map. |
transform_values(expr: Column, f: (Column, Column) => Column) | Transforms map by applying functions to every key-value pair and returns a transformed map. |
map_zip_with( left: Column, right: Column, f: (Column, Column, Column) => Column) |
Merges two maps into a single map. |
element_at(column: Column, value: Any) | Returns a value of a key in a map. |
size(e: Column) | Returns length of a map column. |
Before we start, let’s create a DataFrame with some sample data to work with.
val structureData = Seq(
Row("36636","Finance",Row(3000,"USA")),
Row("40288","Finance",Row(5000,"IND")),
Row("42114","Sales",Row(3900,"USA")),
Row("39192","Marketing",Row(2500,"CAN")),
Row("34534","Sales",Row(6500,"USA"))
)
val structureSchema = new StructType()
.add("id",StringType)
.add("dept",StringType)
.add("properties",new StructType()
.add("salary",IntegerType)
.add("location",StringType)
)
var df = spark.createDataFrame(
spark.sparkContext.parallelize(structureData),structureSchema)
df.printSchema()
df.show(false)
Outputs below schemas and data.
root
|-- id: string (nullable = true)
|-- dept: string (nullable = true)
|-- properties: struct (nullable = true)
| |-- salary: integer (nullable = true)
| |-- location: string (nullable = true)
+-----+---------+-----------+
|id |dept |properties |
+-----+---------+-----------+
|36636|Finance |[3000, USA]|
|40288|Finance |[5000, IND]|
|42114|Sales |[3900, USA]|
|39192|Marketing|[2500, CAN]|
|34534|Sales |[6500, USA]|
+-----+---------+-----------+
map() – Spark SQL map functions
Syntax - map(cols: Column*): Column
org.apache.spark.sql.functions.map() SQL function is used to create a map column of MapType on DataFrame. The input columns to the map function must be grouped as key-value pairs. e.g. (key1, value1, key2, value2, …).
Note: All key columns must have the same data type, and can’t be null and All value columns must have the same data type. Below snippet converts all columns from “properties” struct into map key, value pairs “propertiesmap” column.
val index = df.schema.fieldIndex("properties")
val propSchema = df.schema(index).dataType.asInstanceOf[StructType]
var columns = mutable.LinkedHashSet[Column]()
propSchema.fields.foreach(field =>{
columns.add(lit(field.name))
columns.add(col("properties." + field.name))
})
df = df.withColumn("propertiesMap",map(columns.toSeq:_*))
df = df.drop("properties")
df.printSchema()
df.show(false)
First, we find “properties” column on Spark DataFrame using df.schema.fieldIndex(“properties”) and retrieves all columns and it’s values to a LinkedHashSet. we need LinkedHashSet in order to maintain the insertion order of key and value pair. and finally use map() function with a key, value set pair.
root
|-- id: string (nullable = true)
|-- dept: string (nullable = true)
|-- propertiesMap: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
+-----+---------+---------------------------------+
|id |dept |propertiesMap |
+-----+---------+---------------------------------+
|36636|Finance |[salary -> 3000, location -> USA]|
|40288|Finance |[salary -> 5000, location -> IND]|
|42114|Sales |[salary -> 3900, location -> USA]|
|39192|Marketing|[salary -> 2500, location -> CAN]|
|34534|Sales |[salary -> 6500, location -> USA]|
+-----+---------+---------------------------------+
map_keys() – Returns map keys from a Spark SQL DataFrame
Syntax - map_keys(e: Column): Column
use map_keys() spark function in order to retrieve all keys from a Spark DataFrame MapType column. Note that map_keys takes an argument of MapType while passing any other type returns an error at run time.
df.select(col("id"),map_keys(col("propertiesMap"))).show(false)
Outputs all map keys from a Spark DataFrame
+-----+-----------------------+
|id |map_keys(propertiesMap)|
+-----+-----------------------+
|36636|[salary, location] |
|40288|[salary, location] |
|42114|[salary, location] |
|39192|[salary, location] |
|34534|[salary, location] |
+-----+-----------------------+
map_values() – Returns map values from a Spark DataFrame
Syntax - map_values(e: Column): Column
use map_values() spark function in order to retrieve all values from a Spark DataFrame MapType column. Note that map_values takes an argument of MapType while passing any other type returns an error at run time.
df.select(col("id"),map_values(col("propertiesMap")))
.show(false)
Outputs following.
+-----+-------------------------+
|id |map_values(propertiesMap)|
+-----+-------------------------+
|36636|[3000, USA] |
|40288|[5000, IND] |
|42114|[3900, USA] |
|39192|[2500, CAN] |
|34534|[6500, USA] |
+-----+-------------------------+
map_concat() – Concatenating two or more maps on DataFrame
Syntax - map_concat(cols: Column*): Column
Use Spark SQL map_concat() function in order to concatenate keys and values from more than one map to a single map. All arguments to this function should be MapType, passing any other type results a run time error.
val arrayStructureData = Seq(
Row("James",List(Row("Newark","NY"),Row("Brooklyn","NY")),Map("hair"->"black","eye"->"brown"), Map("height"->"5.9")),
Row("Michael",List(Row("SanJose","CA"),Row("Sandiago","CA")),Map("hair"->"brown","eye"->"black"),Map("height"->"6")),
Row("Robert",List(Row("LasVegas","NV")),Map("hair"->"red","eye"->"gray"),Map("height"->"6.3")),
Row("Maria",null,Map("hair"->"blond","eye"->"red"),Map("height"->"5.6")),
Row("Jen",List(Row("LAX","CA"),Row("Orange","CA")),Map("white"->"black","eye"->"black"),Map("height"->"5.2"))
)
val arrayStructureSchema = new StructType()
.add("name",StringType)
.add("addresses", ArrayType(new StructType()
.add("city",StringType)
.add("state",StringType)))
.add("properties", MapType(StringType,StringType))
.add("secondProp", MapType(StringType,StringType))
val concatDF = spark.createDataFrame(
spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
concatDF.withColumn("mapConcat",map_concat(col("properties"),col("secondProp")))
.select("name","mapConcat")
.show(false)
Output:
+-------+---------------------------------------------+
|name |mapConcat |
+-------+---------------------------------------------+
|James |[hair -> black, eye -> brown, height -> 5.9] |
|Michael|[hair -> brown, eye -> black, height -> 6] |
|Robert |[hair -> red, eye -> gray, height -> 6.3] |
|Maria |[hair -> blond, eye -> red, height -> 5.6] |
|Jen |[white -> black, eye -> black, height -> 5.2]|
+-------+---------------------------------------------+
map_from_entries() – convert array of StructType entries to map
Use map_from_entries() SQL functions to convert array of StructType entries to map (MapType) on Spark DataFrame. This function take DataFrame column ArrayType[StructType] as an argument, passing any other type results an error.
Syntax - map_from_entries(e: Column): Column
concatDF.withColumn("mapFromEntries",map_from_entries(col("addresses")))
.select("name","mapFromEntries")
.show(false)
Output:
+-------+-------------------------------+
|name |mapFromEntries |
+-------+-------------------------------+
|James |[Newark -> NY, Brooklyn -> NY] |
|Michael|[SanJose -> CA, Sandiago -> CA]|
|Robert |[LasVegas -> NV] |
|Maria |null |
|Jen |[LAX -> CA, Orange -> CA] |
+-------+-------------------------------+
map_entries() – convert map of StructType to array of StructType
Syntax - map_entries(e: Column): Column
Use Spark SQL map_entries() function to convert map of StructType to array of StructType column on DataFrame.
Complete Spark SQL map functions example
package com.sparkbyexamples.spark.dataframe.functions.collection
import org.apache.spark.sql.functions.{col, explode, lit, map, map_concat, map_from_entries, map_keys, map_values}
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType}
import org.apache.spark.sql.{Column, Row, SparkSession}
import scala.collection.mutable
object MapFunctions extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
import spark.implicits._
val structureData = Seq(
Row("36636","Finance",Row(3000,"USA")),
Row("40288","Finance",Row(5000,"IND")),
Row("42114","Sales",Row(3900,"USA")),
Row("39192","Marketing",Row(2500,"CAN")),
Row("34534","Sales",Row(6500,"USA"))
)
val structureSchema = new StructType()
.add("id",StringType)
.add("dept",StringType)
.add("properties",new StructType()
.add("salary",IntegerType)
.add("location",StringType)
)
var df = spark.createDataFrame(
spark.sparkContext.parallelize(structureData),structureSchema)
df.printSchema()
df.show(false)
// Convert to Map
val index = df.schema.fieldIndex("properties")
val propSchema = df.schema(index).dataType.asInstanceOf[StructType]
var columns = mutable.LinkedHashSet[Column]()
propSchema.fields.foreach(field =>{
columns.add(lit(field.name))
columns.add(col("properties." + field.name))
})
df = df.withColumn("propertiesMap",map(columns.toSeq:_*))
df = df.drop("properties")
df.printSchema()
df.show(false)
//Retrieve all keys from a Map
val keys = df.select(explode(map_keys(<pre></pre>quot;propertiesMap"))).as[String].distinct.collect
print(keys.mkString(","))
// map_keys
df.select(col("id"),map_keys(col("propertiesMap")))
.show(false)
//map_values
df.select(col("id"),map_values(col("propertiesMap")))
.show(false)
//Creating DF with MapType
val arrayStructureData = Seq(
Row("James",List(Row("Newark","NY"),Row("Brooklyn","NY")),Map("hair"->"black","eye"->"brown"), Map("height"->"5.9")),
Row("Michael",List(Row("SanJose","CA"),Row("Sandiago","CA")),Map("hair"->"brown","eye"->"black"),Map("height"->"6")),
Row("Robert",List(Row("LasVegas","NV")),Map("hair"->"red","eye"->"gray"),Map("height"->"6.3")),
Row("Maria",null,Map("hair"->"blond","eye"->"red"),Map("height"->"5.6")),
Row("Jen",List(Row("LAX","CA"),Row("Orange","CA")),Map("white"->"black","eye"->"black"),Map("height"->"5.2"))
)
val arrayStructureSchema = new StructType()
.add("name",StringType)
.add("addresses", ArrayType(new StructType()
.add("city",StringType)
.add("state",StringType)))
.add("properties", MapType(StringType,StringType))
.add("secondProp", MapType(StringType,StringType))
val concatDF = spark.createDataFrame(
spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
concatDF.printSchema()
concatDF.show()
concatDF.withColumn("mapConcat",map_concat(col("properties"),col("secondProp")))
.select("name","mapConcat")
.show(false)
concatDF.withColumn("mapFromEntries",map_from_entries(col("addresses")))
.select("name","mapFromEntries")
.show(false)
}
Conclusion
In this article, you have learned how to convert an array of StructType to map and Map of StructType to array and concatenating several maps using SQL map functions on the Spark DataFrame column.
Related Articles
- Spark SQL like() Using Wildcard Example
- Spark SQL Explained with Examples
- Spark isin() & IS NOT IN Operator Example
- Spark rlike() Working with Regex Matching Examples
- Spark Check String Column Has Numeric Values
- Spark Check Column Present in DataFrame
- Spark Filter Using contains() Examples
- Spark SQL Inner Join with Example
- Spark SQL Self Join With Example