Problem: How to create a Spark DataFrame with Array of struct column using Spark and Scala?
Using StructType and ArrayType classes we can create a DataFrame with Array of Struct column ( ArrayType(StructType) ). From below example column “booksInterested” is an array of StructType which holds “name”, “author” and the number of “pages”.
val arrayStructData = Seq(
Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),
Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250))),
Row("Washington",null)
)
val arrayStructSchema = new StructType().add("name",StringType)
.add("booksIntersted",ArrayType(new StructType()
.add("name",StringType)
.add("author",StringType)
.add("pages",IntegerType)))
val df = spark.createDataFrame(spark.sparkContext
.parallelize(arrayStructData),arrayStructSchema)
df.printSchema()
df.show()
df.printSchema() and df.show() returns the following schema and table.
root
|-- name: string (nullable = true)
|-- booksIntersted: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- pages: integer (nullable = true)
+----------+-----------------------------------+
|name |booksIntersted |
+----------+-----------------------------------+
|James |[[Java, XX, 120], [Scala, XA, 300]]|
|Michael |[[Java, XY, 200], [Scala, XB, 500]]|
|Robert |[[Java, XZ, 400], [Scala, XC, 250]]|
|Washington|null |
+----------+-----------------------------------+
1. Explode Array of Struct type
Now, let’s explode “booksInterested” array column to struct rows. after exploding each row represents a book of structtype.
// Explode Array of Struct type
import spark.implicits._
val df2= df.select($"name",explode($"booksIntersted"))
df2.printSchema()
df2.show(false)
Outputs:
// Outputs:
root
|-- name: string (nullable = true)
|-- col: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- author: string (nullable = true)
| |-- pages: integer (nullable = true)
+-------+----------------+
|name |col |
+-------+----------------+
|James |[Java, XX, 120] |
|James |[Scala, XA, 300]|
|Michael|[Java, XY, 200] |
|Michael|[Scala, XB, 500]|
|Robert |[Java, XZ, 400] |
|Robert |[Scala, XC, 250]|
2. Collect_list function is used to create a list
Let me also explain how to revert this back to an array of struct type.
df2.groupBy($"name").agg(collect_list($"col").as("booksIntersted"))
.show(false)
collect_list
function returns with duplicates and use collect_set
function if you want unique values.
3. Complete Example
// Complete Example
package com.sparkbyexamples.spark.dataframe.functions.collection
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructType}
import org.apache.spark.sql.{Row, SparkSession}
object ArrayOfStructType extends App{
val spark = SparkSession.builder().appName("SparkByExamples.com")
.master("local[1]")
.getOrCreate()
val arrayStructData = Seq(
Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),
Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250))),
Row("Washington",null)
)
val arrayStructSchema = new StructType().add("name",StringType)
.add("booksIntersted",ArrayType(new StructType()
.add("name",StringType)
.add("author",StringType)
.add("pages",IntegerType)))
val df = spark.createDataFrame(
spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)
df.printSchema()
df.show(false)
import spark.implicits._
val df2 = df.select($"name",explode($"booksIntersted"))
df2.printSchema()
df2.show(false)
df2.groupBy($"name").agg(collect_list($"col").as("booksIntersted"))
.show(false)
}
Happy Learning !!