Working with Spark DataFrame Map (MapType) column

  • Post author:
  • Post category:Apache Spark

In this article, I will explain how to create a Spark DataFrame map column using org.apache.spark.sql.types.MapType class and applying some DataFrame SQL functions on the map column using the Scala example.

While working with Spark structured (Avro, Parquet e.t.c) or semi-structured (JSON) files, we often get data with complex structures like MapType, ArrayType, Array[StructType] e.t.c. and there are not many good articles that explain these. I will try my best to cover some mostly used functions on MapType columns

1. What is MapType

MapType class extends DataType class which is a superclass of all types in Spark and it takes two mandatory arguments “keyType” and “valueType” of type DataType and one optional boolean argument valueContainsNull. keyType and valueType can be any type that extends the DataType class. for e.g StringType, IntegerType, ArrayType, MapType, StructType (struct) e.t.c.

Note: Keys in a map are not allowed to have `null` values.

2. Creating MapType map column on Spark DataFrame

You can create the instance of the MapType on Spark DataFrame using DataTypes.createMapType() or using the MapType scala case class.

2.1 Using Spark DataTypes.createMapType()

We can create a map column using createMapType() function on the DataTypes class. This method takes two arguments keyType and valueType as mentioned above and these two arguments should be of a type that extends DataType.


val mapCol = DataTypes.createMapType(StringType, StringType)

This snippet creates “mapCol” object of type MapType with key and values as String type.


val mapCol = DataTypes.createMapType((StringType,
StructType(Array(StructField("col1",StringType),StructField("col2",StringType )))

This snippet creates “mapCol” object of type MapType with key as StringType and value as struct (StructType) with columns “col1” and “col2”.

2.2 Using MapType case class

We can also create an instance of a MapType using MapType() case class, This takes 2 mandatory argument key and value and one optional argument “valueContainsNull” to specify if a value can accept null.


val caseMapCol = MapType(StringType,StringType,false)

val caseMapCol = MapType(StringType,StructType(Array(
StructField("col1",StringType),
StructField("col1",StringType ))))

This snippet creates “caseMapCol” object of type MapType with key as StringType and value as struct with columns “col1” and “col2”.

Example


  val arrayStructureData = Seq(
    Row("James",List(Row("Newark","NY"),Row("Brooklyn","NY")),
      Map("hair"->"black","eye"->"brown"), Map("height"->"5.9")),
    Row("Michael",List(Row("SanJose","CA"),Row("Sandiago","CA")),
      Map("hair"->"brown","eye"->"black"),Map("height"->"6")),
    Row("Robert",List(Row("LasVegas","NV")),
      Map("hair"->"red","eye"->"gray"),Map("height"->"6.3")),
    Row("Maria",null,Map("hair"->"blond","eye"->"red"),
      Map("height"->"5.6")),
    Row("Jen",List(Row("LAX","CA"),Row("Orange","CA")),
      Map("white"->"black","eye"->"black"),Map("height"->"5.2"))
  )

  val mapType  = DataTypes.createMapType(StringType,StringType)

  val arrayStructureSchema = new StructType()
    .add("name",StringType)
    .add("addresses", ArrayType(new StructType()
      .add("city",StringType)
      .add("state",StringType)))
    .add("properties", mapType)
    .add("secondProp", MapType(StringType,StringType))

  val mapTypeDF = spark.createDataFrame(
    spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
  mapTypeDF.printSchema()
  mapTypeDF.show()

Outputs:


root
 |-- name: string (nullable = true)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- secondProp: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+-------+-------------------------------+------------------------------+---------------+
|name   |addresses                      |properties                    |secondProp     |
+-------+-------------------------------+------------------------------+---------------+
|James  |[[Newark, NY], [Brooklyn, NY]] |[hair -> black, eye -> brown] |[height -> 5.9]|
|Michael|[[SanJose, CA], [Sandiago, CA]]|[hair -> brown, eye -> black] |[height -> 6]  |
|Robert |[[LasVegas, NV]]               |[hair -> red, eye -> gray]    |[height -> 6.3]|
|Maria  |null                           |[hair -> blond, eye -> red]   |[height -> 5.6]|
|Jen    |[[LAX, CA], [Orange, CA]]      |[white -> black, eye -> black]|[height -> 5.2]|
+-------+-------------------------------+------------------------------+---------------+

3. Spark SQL functions to work with map column (MapType)

Spark SQL provides several map functions to work with MapType, In this section, we will see some of the most commonly used SQL functions

3.1 Getting all map Keys from DataFrame MapType column

Use map_keys() spark function in order to retrieve all keys from a Spark DataFrame MapType column. Note that map_keys takes an argument of MapType while passing any other type returns an error at run time.


mapTypeDF.select(col("name"),map_keys(col("properties"))).show(false)

Outputs:


+-------+--------------------+
|name   |map_keys(properties)|
+-------+--------------------+
|James  |[hair, eye]         |
|Michael|[hair, eye]         |
|Robert |[hair, eye]         |
|Maria  |[hair, eye]         |
|Jen    |[white, eye]        |
+-------+--------------------+

3.2 Getting all map values from the DataFrame MapType column

Use map_values() spark function to retrieve all values from a Spark DataFrame  MapType column. Note that map_values takes an argument of MapType while passing any other type returns an error at run time.


mapTypeDF.select(col("name"),map_values(col("properties"))).show(false)

Outputs:


+-------+----------------------+
|name   |map_values(properties)|
+-------+----------------------+
|James  |[black, brown]        |
|Michael|[brown, black]        |
|Robert |[red, gray]           |
|Maria  |[blond, red]          |
|Jen    |[black, black]        |
+-------+----------------------+

3.3 Merging maps using map_concat()

Using Spark SQL map_concat() function we should able to merge keys and values from more than one map to a single map. All arguments to this function should be MapType, passing any other type results a runtime error.


 mapTypeDF.select(col("name"),map_concat(col("properties"),col("secondProp"))).show(false)

Here, we have merged map columns “properties” and “secondProp” into a single column. This yields the below output.


+-------+---------------------------------------------+
|name   |map_concat(properties, secondProp)           |
+-------+---------------------------------------------+
|James  |[hair -> black, eye -> brown, height -> 5.9] |
|Michael|[hair -> brown, eye -> black, height -> 6]   |
|Robert |[hair -> red, eye -> gray, height -> 6.3]    |
|Maria  |[hair -> blond, eye -> red, height -> 5.6]   |
|Jen    |[white -> black, eye -> black, height -> 5.2]|
+-------+---------------------------------------------+

3.4 Convert an array of StructType entries to map

Use map_from_entries() SQL function to convert array of StructType entries to map (MapType) on Spark DataFrame. This function takes DataFrame column ArrayType[StructType] as an argument, passing any other type results in an error.

3.5 Convert map of StructType to an array of StructType

Use Spark SQL map_entries() function to convert map of StructType to array of Structype (struct) column on DataFrame.

4. Dynamically create MapType on Spark DataFrame

map() SQL function is used to create a map column of MapType on DataFrame dynamically at runtime, The input columns to the map function must be grouped as key-value pairs. e.g. (key1, value1, key2, value2, …).

Note: All key columns must have the same data type, and can’t be null and All value columns must have the same data type. Below snippet converts all columns from “properties” struct into map key, value pairs “propertiesMap” column.


val structureData = Seq(
  Row("36636","Finance",Row(3000,"USA")),
  Row("40288","Finance",Row(5000,"IND")),
  Row("42114","Sales",Row(3900,"USA")),
  Row("39192","Marketing",Row(2500,"CAN")),
  Row("34534","Sales",Row(6500,"USA"))
)

val structureSchema = new StructType()
  .add("id",StringType)
  .add("dept",StringType)
  .add("properties",new StructType()
    .add("salary",IntegerType)
    .add("location",StringType)
  )

var df = spark.createDataFrame(
  spark.sparkContext.parallelize(structureData),structureSchema)

val index = df.schema.fieldIndex("properties")
val propSchema = df.schema(index).dataType.asInstanceOf[StructType]
var columns = mutable.LinkedHashSet[Column]()
propSchema.fields.foreach(field =>{
  columns.add(lit(field.name))
  columns.add(col("properties." + field.name))
})

df = df.withColumn("propertiesMap",map(columns.toSeq:_*))
df = df.drop("properties")
df.printSchema()
df.show(false)

First, we find “properties” column on Spark DataFrame using df.schema.fieldIndex(“properties”) and retrieves all columns and it’s values to a LinkedHashSet. we need LinkedHashSet in order to maintain the insertion order of key and value pair. and finally use map() function with a key, value set pair.


root
 |-- id: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- propertiesMap: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+-----+---------+---------------------------------+
|id   |dept     |propertiesMap                    |
+-----+---------+---------------------------------+
|36636|Finance  |[salary -> 3000, location -> USA]|
|40288|Finance  |[salary -> 5000, location -> IND]|
|42114|Sales    |[salary -> 3900, location -> USA]|
|39192|Marketing|[salary -> 2500, location -> CAN]|
|34534|Sales    |[salary -> 6500, location -> USA]|
+-----+---------+---------------------------------+

Conclusion

In this article, you have learned how to create a MapType (map) column on Spark DataFrame using case class and DataTypes. And also explored some of the SQL functions to work with MapType.

Happy Learning !!

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

This Post Has 4 Comments

  1. Xinyu

    Hi NNK,

    I am wondering if it should be DataTypes instead of DataType, since I found that DataType does not have createMapType() method.

    1. NNK

      Hi Xinyu, Thanks for pointing it out. It was a type and have fixed it now.

  2. yns

    hi
    please check the map concat() function,
    the code doesnt make any sence tome.
    please let me know the explanation.

    1. NNK

      Hi yns, Thanks for pointing it out. I’ve fixed it, please review it and let me know if it makes sense now. I hope you are enjoying reading the articles 🙂

Leave a Reply