Spark explode array and map columns to rows

In this article, I will explain how to explode array or list and map DataFrame columns to rows using different Spark explode functions (explode, explore_outer, posexplode, posexplode_outer) with Scala example.

While working with structured files like JSON, Parquet, Avro, and XML we often get data in collections like arrays, lists, and maps, In such cases, these explode functions are useful to convert collection columns to rows in order to process in Spark effectively.

Though I’ve explained here with Scala, a similar method could be used to explode array and map columns to rows with PySpark and if time permits I will cover it in the future. If you are looking for PySpark, I would still recommend reading through this article as it would give you an Idea on Spark explode functions and usage.

Before we start, let’s create a DataFrame with array and map fields, below snippet, creates a DF with columns “name” as StringType, “knownLanguage” as ArrayType and “properties” as MapType.

And, from below code, “spark” is an instance of SparkSession, please refer to complete code at the end to see how to create SparkSession object.


    import spark.implicits._

    val arrayData = Seq(
      Row("James",List("Java","Scala"),Map("hair"->"black","eye"->"brown")),
    Row("Michael",List("Spark","Java",null),Map("hair"->"brown","eye"->null)),
    Row("Robert",List("CSharp",""),Map("hair"->"red","eye"->"")),
    Row("Washington",null,null),
    Row("Jefferson",List(),Map())
    )

    val arraySchema = new StructType()
      .add("name",StringType)
      .add("knownLanguages", ArrayType(StringType))
      .add("properties", MapType(StringType,StringType))

    val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData),arraySchema)
    df.printSchema()
    df.show(false)

explode – spark explode array or map column to rows

Spark function explode(e: Column) is used to explode or create array or map columns to rows. When an array is passed to this function, it creates a new default column “col1” and it contains all array elements. When a map is passed, it creates two new columns one for key and one for value and each element in map split into the row.

This will ignore elements that have null or empty. from the above example, Washington and Jefferson have null or empty values in array and map, hence the following snippet out does not contain these rows.

explode – array column example


    df.select($"name",explode($"knownLanguages"))
      .show(false)

Outputs:


+-------+------+
|name   |col   |
+-------+------+
|James  |Java  |
|James  |Scala |
|Michael|Spark |
|Michael|Java  |
|Michael|null  |
|Robert |CSharp|
|Robert |      |
+-------+------+

explode – map column example


    df.select($"name",explode($"properties"))
      .show(false)

Outputs:


+-------+----+-----+
|name   |key |value|
+-------+----+-----+
|James  |hair|black|
|James  |eye |brown|
|Michael|hair|brown|
|Michael|eye |null |
|Robert |hair|red  |
|Robert |eye |     |
+-------+----+-----+

explode_outer – Create rows for each element in an array or map.

Spark SQL explode_outer(e: Column) function is used to create a row for each element in the array or map column. Unlike explode, if the array or map is null or empty, explode_outer returns null.

explode_outer – array example


    df.select($"name",explode_outer($"knownLanguages"))
      .show(false)

Outputs:


+----------+------+
|name      |col   |
+----------+------+
|James     |Java  |
|James     |Scala |
|Michael   |Spark |
|Michael   |Java  |
|Michael   |null  |
|Robert    |CSharp|
|Robert    |      |
|Washington|null  |
|Jeferson  |null  |
+----------+------+

explode_outer – map example


    df.select($"name",explode_outer($"properties"))
      .show(false)

Outputs:


+----------+----+-----+
|name      |key |value|
+----------+----+-----+
|James     |hair|black|
|James     |eye |brown|
|Michael   |hair|brown|
|Michael   |eye |null |
|Robert    |hair|red  |
|Robert    |eye |     |
|Washington|null|null |
|Jeferson  |null|null |
+----------+----+-----+

posexplode – explode array or map elements to rows

posexplode(e: Column) creates a row for each element in the array and creates two columns “pos’ to hold the position of the array element and the ‘col’ to hold the actual array value. And when the input column is a map, posexplode function creates 3 columns “pos” to hold the position of the map element, “key” and “value” columns.

This will ignore elements that have null or empty. Since the Washington and Jefferson have null or empty values in array and map, the following snippet out does not contain these.

posexplode – array example


    df.select($"name",posexplode($"knownLanguages"))
      .show(false)

Outputs:


+-------+---+------+
|name   |pos|col   |
+-------+---+------+
|James  |0  |Java  |
|James  |1  |Scala |
|Michael|0  |Spark |
|Michael|1  |Java  |
|Michael|2  |null  |
|Robert |0  |CSharp|
|Robert |1  |      |
+-------+---+------+

posexplode – map example


    df.select($"name",posexplode($"properties"))
      .show(false)

Outputs:


+-------+---+----+-----+
|name   |pos|key |value|
+-------+---+----+-----+
|James  |0  |hair|black|
|James  |1  |eye |brown|
|Michael|0  |hair|brown|
|Michael|1  |eye |null |
|Robert |0  |hair|red  |
|Robert |1  |eye |     |
+-------+---+----+-----+

posexplode_outer – explode array or map columns to rows.

Spark posexplode_outer(e: Column) creates a row for each element in the array and creates two columns “pos’ to hold the position of the array element and the ‘col’ to hold the actual array value. Unlike posexplode, if the array or map is null or empty, posexplode_outer function returns null, null for pos and col columns. Similarly for the map, it returns rows with nulls.

posexplode_outer – array example


    df.select($"name",posexplode_outer($"knownLanguages"))
      .show(false)

Outputs:


+----------+----+------+
|name      |pos |col   |
+----------+----+------+
|James     |0   |Java  |
|James     |1   |Scala |
|Michael   |0   |Spark |
|Michael   |1   |Java  |
|Michael   |2   |null  |
|Robert    |0   |CSharp|
|Robert    |1   |      |
|Washington|null|null  |
|Jeferson  |null|null  |
+----------+----+------+

posexplode_outer – map example


    df.select($"name",posexplode_outer($"properties"))
      .show(false)

Outputs:


+----------+----+----+-----+
|name      |pos |key |value|
+----------+----+----+-----+
|James     |0   |hair|black|
|James     |1   |eye |brown|
|Michael   |0   |hair|brown|
|Michael   |1   |eye |null |
|Robert    |0   |hair|red  |
|Robert    |1   |eye |     |
|Washington|null|null|null |
|Jeferson  |null|null|null |
+----------+----+----+-----+

The complete example of exploding array or maps to rows


package com.sparkbyexamples.spark.dataframe.functions

import com.sparkbyexamples.spark.dataframe.functions.ArraTypeExample.spark
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType}

object ExplodeArrayAndMap{

  def main(args:Array[String]) : Unit = {

    val spark = SparkSession.builder().appName("SparkByExamples.com")
      .master("local[1]")
      .getOrCreate()

     // create DataFrame

    val arrayData = Seq(
      Row("James",List("Java","Scala","C++"),Map("hair"->"black","eye"->"brown")),
      Row("Michael",List("Spark","Java","C++",null),Map("hair"->"brown","eye"->null)),
      Row("Robert",List("CSharp","Python",""),Map("hair"->"red","eye"->"")),
      Row("Washington",null,null),
      Row("Jeferson",List(),Map())
    )

    val arraySchema = new StructType()
      .add("name",StringType)
      .add("knownLanguages", ArrayType(StringType))
      .add("properties", MapType(StringType,StringType))

    val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData),arraySchema)
    df.printSchema()
    df.show()

    import spark.implicits._
    // Below are Array examples
    //explode
    df.select($"name",explode($"knownLanguages"))
      .show()

    //explode_outer
    df.select($"name",explode_outer($"knownLanguages"))
      .show()

    //posexplode
    df.select($"name",posexplode($"knownLanguages"))
      .show()

    //posexplode_outer
    df.select($"name",posexplode_outer($"knownLanguages"))
      .show()

    // Below are Map examples

    //explode
    df.select($"name",explode($"properties"))
      .show()
    //explode_outer
    df.select($"name",explode_outer($"properties"))
      .show()
    //posexplode
    df.select($"name",posexplode($"properties"))
      .show()

    //posexplode_outer
    df.select($"name",posexplode_outer($"properties"))
      .show()
  }
}

Some common faq’s of explode functions

What is explode function

Spark SQL explode function is used to create or split an array or map DataFrame columns to rows. Spark defines several flavors of this function; explode_outer – to handle nulls and empty, posexplode – which explodes with a position of element and posexplode_outer – to handle nulls.

Difference between explode vs explode_outer

explode – creates a row for each element in the array or map column by ignoring null or empty values in array. whereas explode_outer returns all values in array or map including null or empty.

Difference between explode vs posexplode

explode – creates a row for each element in the array or map column. whereas posexplode creates a row for each element in the array and creates two columns ‘pos’ to hold the position of the array element and the ‘col’ to hold the actual array value. And, for the map, it creates 3 columns ‘pos’, ‘key’ and ‘value’

Conclusion

In this article, you have learned how to how to explode or convert array or map DataFrame columns to rows using explode and posexplode SQL functions and their’s respective outer functions and also learned differences between these functions.

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

Leave a Reply

Close Menu