Spark SQL DataFrame Array (ArrayType) Column

In this article, I will explain how to create a DataFrame array column using Spark SQL org.apache.spark.sql.types.ArrayType class and applying some SQL functions on the array column using Scala examples.

While working with Spark structured (Avro, Parquet e.t.c) or semi-structured (JSON) files, we often get data with complex structures like MapType, ArrayType, Array[StructType] e.t.c. and there are not many good articles that explain these. I will try my best to cover some mostly used functions on ArraType columns.

 

  • What is Spark SQL ArrayType
  • Creating ArrayType map Column on Spark DataFrame
    • Using DataType.createArrayType()
    • Using ArrayType case class

What is Spark SQL ArrayType

ArrayType is a collection data type in Spark SQL, which extends the DataType class which is a superclass of all types in Spark and all elements of ArrayType should have the same type of elements.

Creating array (ArrayType) Column on Spark DataFrame

You can create the array column of type ArrayType on Spark DataFrame using using DataType.createArrayType() or using the ArrayType scala case class.

Using Spark DataType.createArrayType()

createArrayType() method on the DataType object returns a DataFrame column of ArrayType. This method takes one argument of type DataType meaning any type that extends DataType class.


val arrayCol = DataType.createArrayType(StringType)

It also has an overloaded method DataType.createArrayType(DataType,Boolean) which takes an additional boolean argument to specify if values of a column can accept null or not.


val mapCol = DataType.createMapType((StringType,
StructType(Array(StructField("col1",StringType),StructField("col2",StringType )))

Using ArrayType case class

We can also create an instance of an ArrayType using ArraType() case class, This takes arguments valueType and one optional argument “valueContainsNull” to specify if a value can accept null.


val caseArrayCol = ArrayType(StringType,false)

Example of Array Column on DataFrame


  val arrayStructureData = Seq(
Row("James,,Smith",List("Java","Scala","C++"),List("Spark","Java"),"OH","CA"),
Row("Michael,Rose,",List("Spark","Java","C++"),List("Spark","Java"),"NY","NJ"),
Row("Robert,,Williams",List("CSharp","VB"),List("Spark","Python"),"UT","NV")
)
  
  val arrayStructureSchema = new StructType()
    .add("name",StringType)
    .add("languagesAtSchool", ArrayType(StringType))
    .add("languagesAtWork", ArrayType(StringType))
    .add("currentState", StringType)
    .add("previousState", StringType)
  
  val df = spark.createDataFrame(
    spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
  df.printSchema()
  df.show()

This snippet creates two Array columns “languagesAtSchool” and “languagesAtWork” which ideally defines languages learned at School and languages using at work. And rest of the article will learn several Spark SQL array functions using this DataFrame. printSchema() and show() from above snippet display below output.


root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- languagesAtWork: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)
 |-- previousState: string (nullable = true)

+----------------+------------------+---------------+------------+-------------+
|            name| languagesAtSchool|languagesAtWork|currentState|previousState|
+----------------+------------------+---------------+------------+-------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|          OH|           CA|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|          NY|           NJ|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|          UT|           NV|
+----------------+------------------+---------------+------------+-------------+

Spark SQL Array functions

Spark SQL provides several Array functions to work with the ArrayType column, In this section, we will see some of the most commonly used SQL functions.

explode

Use explode() function to create a new row for each element in the given array column. There are various Spark SQL explode functions available to work with Array columns.


df.select($"name",explode($"languagesAtSchool")).show(false)

+----------------+------+
|name            |col   |
+----------------+------+
|James,,Smith    |Java  |
|James,,Smith    |Scala |
|James,,Smith    |C++   |
|Michael,Rose,   |Spark |
|Michael,Rose,   |Java  |
|Michael,Rose,   |C++   |
|Robert,,Williams|CSharp|
|Robert,,Williams|VB    |
+----------------+------+

Split

Splits the inputted column and returns an array type.


df.select(split($"name",",").as("nameAsArray") ).show(false)

+--------------------+
|nameAsArray         |
+--------------------+
|[James, , Smith]    |
|[Michael, Rose, ]   |
|[Robert, , Williams]|
+--------------------+

array

Creates a new array column. All input columns must have the same data type.


df.select($"name",array($"currentState",$"previousState").as("States") ).show(false)

+----------------+--------+
|name            |States  |
+----------------+--------+
|James,,Smith    |[OH, CA]|
|Michael,Rose,   |[NY, NJ]|
|Robert,,Williams|[UT, NV]|
+----------------+--------+    

array_contains

Returns null if the array is null, true if the array contains `value`, and false otherwise.


df.select($"name",array_contains($"languagesAtSchool","Java")
    .as("array_contains")).show(false)

    +----------------+--------------+
|name            |array_contains|
+----------------+--------------+
|James,,Smith    |true          |
|Michael,Rose,   |true          |
|Robert,,Williams|false         |
+----------------+--------------+

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

Leave a Reply

Close Menu