Spark SQL StructType & StructField with examples

  • Post author:
  • Post category:Apache Spark

Spark SQL StructType & StructField classes are used to programmatically specify the schema to the DataFrame and creating complex columns like nested struct, array and map columns. StructType is a collection of StructField’s that defines column name, column data type, boolean to specify if the field can be nullable or not and metadata.

In this article, we will learn different ways to define the structure of DataFrame using Spark SQL StructType with scala examples. Though Spark infers a schema from data, some times we may need to define our own column names and data types and this article explains how to define simple, nested, and complex schemas.

StructType – Defines the structure of the Dataframe

Spark provides spark.sql.types.StructType class to define the structure of the DataFrame and It is a collection or list on StructField objects. By calling printSchema() method on the DataFrame, StructType columns are represents as “struct”.

StructField – Defines the metadata of the DataFrame column

Spark provides spark.sql.types.StructField class to define the column name(String), column type (DataType), nullable column (Boolean) and metadata (MetaData)

Using Spark StructType & StructField with DataFrame

While creating a Spark DataFrame we can specify the structure using StructType and StructField classes. As specified in the introduction, StructType is a collection of StructField’s which is used to define the column name, data type and a flag for nullable or not. Using StructField we can also add nested struct schema, ArrayType for arrays and MapType for key-value pairs which we will discuss detail in later sections.

StructType & StructField case class as follows.


case class StructType(fields: Array[StructField])

case class StructField(
    name: String,
    dataType: DataType,
    nullable: Boolean = true,
    metadata: Metadata = Metadata.empty)

The below example demonstrates a very simple example of how to create a struct using StructType & StructField on DataFrame and it’s usage with sample data to support it.


val simpleData = Seq(Row("James ","","Smith","36636","M",3000),
    Row("Michael ","Rose","","40288","M",4000),
    Row("Robert ","","Williams","42114","M",4000),
    Row("Maria ","Anne","Jones","39192","F",4000),
    Row("Jen","Mary","Brown","","F",-1)
  )

val simpleSchema = StructType(Array(
    StructField("firstname",StringType,true),
    StructField("middlename",StringType,true),
    StructField("lastname",StringType,true),
    StructField("id", StringType, true),
    StructField("gender", StringType, true),
    StructField("salary", IntegerType, true)
  ))

  val df = spark.createDataFrame(
      spark.sparkContext.parallelize(simpleData),simpleSchema)
  df.printSchema()
  df.show()

By running the above snippet, it displays below outputs.


root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

Defining nested StructType object struct

While working on DataFrame we often need to work with the nested struct column and this can be defined using SQL StructType.

On the below example I have instantiated StructType and use add method (instead of StructField) to add column names and datatype. Notice that for column “name” data type is StructType which is nested.


  val structureData = Seq(
    Row(Row("James ","","Smith"),"36636","M",3100),
    Row(Row("Michael ","Rose",""),"40288","M",4300),
    Row(Row("Robert ","","Williams"),"42114","M",1400),
    Row(Row("Maria ","Anne","Jones"),"39192","F",5500),
    Row(Row("Jen","Mary","Brown"),"","F",-1)
  )

  val structureSchema = new StructType()
    .add("name",new StructType()
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType))
    .add("id",StringType)
    .add("gender",StringType)
    .add("salary",IntegerType)

  val df2 = spark.createDataFrame(
     spark.sparkContext.parallelize(structureData),structureSchema)
  df2.printSchema()
  df2.show()

Outputs below schema and the DataFrame


root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+--------------------+-----+------+------+
|                name|   id|gender|salary|
+--------------------+-----+------+------+
|   [James , , Smith]|36636|     M|  3100|
|  [Michael , Rose, ]|40288|     M|  4300|
|[Robert , , Willi...|42114|     M|  1400|
|[Maria , Anne, Jo...|39192|     F|  5500|
|  [Jen, Mary, Brown]|     |     F|    -1|
+--------------------+-----+------+------+

Creating StructType object struct from JSON file

If you have too many columns and the structure of the DataFrame changes now and then, it’s a good practice to load the SQL StructType schema from JSON file. Note the definition in JSON uses the different layout and you can get this by using schema.prettyJson() 


{
  "type" : "struct",
  "fields" : [ {
    "name" : "name",
    "type" : {
      "type" : "struct",
      "fields" : [ {
        "name" : "firstname",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      }, {
        "name" : "middlename",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      }, {
        "name" : "lastname",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      } ]
    },
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "dob",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "gender",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "salary",
    "type" : "integer",
    "nullable" : true,
    "metadata" : { }
  } ]
}


  val url = ClassLoader.getSystemResource("schema.json")
  val schemaSource = Source.fromFile(url.getFile).getLines.mkString
  val schemaFromJson = DataType.fromJson(schemaSource).asInstanceOf[StructType]
  val df3 = spark.createDataFrame(
        spark.sparkContext.parallelize(structureData),schemaFromJson)
  df3.printSchema()

This prints the same output as the previous section. You can also, have a name, type, and flag for nullable in a comma-separated file and we can use these to create a StructType programmatically, I will leave this to you to explore.

Adding & Changing struct of the DataFrame

Using Spark SQL function struct(), we can change the struct of the existing DataFrame and add a new StructType to it. The below example demonstrates how to copy the columns from one structure to another and adding a new column.


 val updatedDF = df4.withColumn("OtherInfo", 
    struct(  col("id").as("identifier"),
    col("gender").as("gender"),
    col("salary").as("salary"),
    when(col("salary").cast(IntegerType) < 2000,"Low")
      .when(col("salary").cast(IntegerType) < 4000,"Medium")
      .otherwise("High").alias("Salary_Grade")
  )).drop("id","gender","salary")

  updatedDF.printSchema()
  updatedDF.show(false)

Here, it copies “gender“, “salary” and “id” to the new struct “otherInfo” and add’s a new column “Salary_Grade“.


root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- OtherInfo: struct (nullable = false)
 |    |-- identifier: string (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- salary: integer (nullable = true)
 |    |-- Salary_Grade: string (nullable = false)

Using SQL ArrayType and MapType

SQL StructType also supports ArrayType and MapType to define the DataFrame columns for array and map collections respectively. On the below example, column “hobbies” defined as ArrayType(StringType) and “properties” defined as MapType(StringType,StringType) meaning both key and value as String.


  val arrayStructureData = Seq(
    Row(Row("James ","","Smith"),List("Cricket","Movies"),Map("hair"->"black","eye"->"brown")),
    Row(Row("Michael ","Rose",""),List("Tennis"),Map("hair"->"brown","eye"->"black")),
    Row(Row("Robert ","","Williams"),List("Cooking","Football"),Map("hair"->"red","eye"->"gray")),
    Row(Row("Maria ","Anne","Jones"),null,Map("hair"->"blond","eye"->"red")),
    Row(Row("Jen","Mary","Brown"),List("Blogging"),Map("white"->"black","eye"->"black"))
  )

  val arrayStructureSchema = new StructType()
    .add("name",new StructType()
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType))
    .add("hobbies", ArrayType(StringType))
    .add("properties", MapType(StringType,StringType))

  val df5 = spark.createDataFrame(
     spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
  df5.printSchema()
  df5.show()

Outputs the below schema and the DataFrame data. Note that field “Hobbies” is array type and “properties” is map type.


root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- hobbies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+---------------------+-------------------+------------------------------+
|name                 |hobbies            |properties                    |
+---------------------+-------------------+------------------------------+
|[James , , Smith]    |[Cricket, Movies]  |[hair -> black, eye -> brown] |
|[Michael , Rose, ]   |[Tennis]           |[hair -> brown, eye -> black] |
|[Robert , , Williams]|[Cooking, Football]|[hair -> red, eye -> gray]    |
|[Maria , Anne, Jones]|null               |[hair -> blond, eye -> red]   |
|[Jen, Mary, Brown]   |[Blogging]         |[white -> black, eye -> black]|
+---------------------+-------------------+------------------------------+

Convert case class to Spark StructType

Spark SQL also provides Encoders to convert case class to StructType object. If you are using older versions of Spark, you can also transform the case class to the schema using the Scala hack. Both examples are present here.


  case class Name(first:String,last:String,middle:String)
  case class Employee(fullName:Name,age:Integer,gender:String)

  import org.apache.spark.sql.catalyst.ScalaReflection
  val schema = ScalaReflection.schemaFor[Employee].dataType.asInstanceOf[StructType]

  val encoderSchema = Encoders.product[Employee].schema
  encoderSchema.printTreeString()

printTreeString() outputs the below schema.


root
 |-- fullName: struct (nullable = true)
 |    |-- first: string (nullable = true)
 |    |-- last: string (nullable = true)
 |    |-- middle: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)

Creating StructType object struct from DDL String

Like loading structure from JSON string, we can also create it from DLL ( by using fromDDL() static function on SQL StructType class StructType.fromDDL). You can also generate DDL from a schema using toDDL(). printTreeString() on struct object prints the schema similar to printSchemafunction returns.


 val ddlSchemaStr = "`fullName` STRUCT<`first`: STRING, `last`: STRING,
 `middle`: STRING>,`age` INT,`gender` STRING"
  val ddlSchema = StructType.fromDDL(ddlSchemaStr)
  ddlSchema.printTreeString()

Checking if a field exists in a DataFrame

If you want to perform some checks on metadata of the DataFrame, for example, if a column or field exists in a DataFrame or data type of column; we can easily do this using several functions on SQL StructType and StructField.


println(df.schema.fieldNames.contains("firstname"))
println(df.schema.contains(StructField("firstname",StringType,true)))

This example returns “true” for both scenarios. And for the second one if you have IntegetType instead of StringType it returns false as the datatype for first name column is String, as it checks every property ins field. Similarly, you can also check if two schemas are equal and more.

The complete example explained here is available at GitHub project.

Conclusion:

In this article, you have learned the usage of SQL StructType, StructField and how to change the structure of the spark DataFrame at runtime, converting case class to the schema and using ArrayType, MapType.

This Post Has 5 Comments

  1. abdul sattar

    heyyy , thank you very much dude for this effort …really appreciate that

  2. Yaw Kusi

    Greatly appreciate your time and effort putting this tutorial on spark together. Really informative!

    1. NNK

      Thanks for your kind works Yaw.

  3. Satendra Rathore

    your are just awesome, I’ve just started learning spark, the variety of examples that you have put together in one place is simply awesome. Thanks a lot.

    1. NNK

      Thanks, Satendra for wonderful words. please spread the word 🙂

Leave a Reply