Spark SQL StructType & StructField classes are used to programmatically specify the schema to the DataFrame and creating complex columns like nested struct, array and map columns. StructType is a collection of StructField’s. Using StructField we can define column name, column data type, nullable column (boolean to specify if the field can be nullable or not) and metadata.
In this article, we will learn different ways to define the structure of DataFrame using Spark SQL StructType with scala examples. Though Spark infers a schema from data, some times we may need to define our own column names and data types and this article explains how to define simple, nested, and complex schemas.
StructType – Defines the structure of the Dataframe
Spark provides spark.sql.types.StructType
class to define the structure of the DataFrame and It is a collection or list on StructField objects. By calling Spark DataFrame printSchema()
print the schema on console where StructType columns are represented as struct
.
StructField – Defines the metadata of the DataFrame column
Spark provides spark.sql.types.StructField
class to define the column name(String), column type (DataType), nullable column (Boolean) and metadata (MetaData)
- Using Spark StructType & StructField with DataFrame
- Defining nested StructType or struct
- Creating StructType or struct from Json file
- Adding & Changing columns of the DataFrame
- Using SQL ArrayType and MapType
- Convert case class to StructType
- Creating StructType object from DDL string
- Check if a field exists in a StructType
Using Spark StructType & StructField with DataFrame
While creating a Spark DataFrame we can specify the structure using StructType and StructField classes. As specified in the introduction, StructType is a collection of StructField’s which is used to define the column name, data type and a flag for nullable or not. Using StructField we can also add nested struct schema, ArrayType for arrays and MapType for key-value pairs which we will discuss in detail in later sections.
StructType & StructField case class as follows.
case class StructType(fields: Array[StructField])
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty)
The below example demonstrates a very simple example of how to create a struct using StructType & StructField on DataFrame and it’s usage with sample data to support it.
val simpleData = Seq(Row("James ","","Smith","36636","M",3000),
Row("Michael ","Rose","","40288","M",4000),
Row("Robert ","","Williams","42114","M",4000),
Row("Maria ","Anne","Jones","39192","F",4000),
Row("Jen","Mary","Brown","","F",-1)
)
val simpleSchema = StructType(Array(
StructField("firstname",StringType,true),
StructField("middlename",StringType,true),
StructField("lastname",StringType,true),
StructField("id", StringType, true),
StructField("gender", StringType, true),
StructField("salary", IntegerType, true)
))
val df = spark.createDataFrame(
spark.sparkContext.parallelize(simpleData),simpleSchema)
df.printSchema()
df.show()
By running the above snippet, it displays the below outputs.
root
|-- firstname: string (nullable = true)
|-- middlename: string (nullable = true)
|-- lastname: string (nullable = true)
|-- id: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname| id|gender|salary|
+---------+----------+--------+-----+------+------+
| James | | Smith|36636| M| 3000|
| Michael | Rose| |40288| M| 4000|
| Robert | |Williams|42114| M| 4000|
| Maria | Anne| Jones|39192| F| 4000|
| Jen| Mary| Brown| | F| -1|
+---------+----------+--------+-----+------+------+
Defining nested StructType object struct
While working on DataFrame we often need to work with the nested struct column and this can be defined using SQL StructType.
On the below example I have instantiated StructType and use add method (instead of StructField) to add column names and datatype. Notice that for column “name” data type is StructType which is nested.
val structureData = Seq(
Row(Row("James ","","Smith"),"36636","M",3100),
Row(Row("Michael ","Rose",""),"40288","M",4300),
Row(Row("Robert ","","Williams"),"42114","M",1400),
Row(Row("Maria ","Anne","Jones"),"39192","F",5500),
Row(Row("Jen","Mary","Brown"),"","F",-1)
)
val structureSchema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("id",StringType)
.add("gender",StringType)
.add("salary",IntegerType)
val df2 = spark.createDataFrame(
spark.sparkContext.parallelize(structureData),structureSchema)
df2.printSchema()
df2.show()
Outputs below schema and the DataFrame
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- id: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
+--------------------+-----+------+------+
| name| id|gender|salary|
+--------------------+-----+------+------+
| [James , , Smith]|36636| M| 3100|
| [Michael , Rose, ]|40288| M| 4300|
|[Robert , , Willi...|42114| M| 1400|
|[Maria , Anne, Jo...|39192| F| 5500|
| [Jen, Mary, Brown]| | F| -1|
+--------------------+-----+------+------+
Creating StructType object struct from JSON file
If you have too many columns and the structure of the DataFrame changes now and then, it’s a good practice to load the SQL StructType schema from JSON file. Note the definition in JSON uses the different layout and you can get this by using schema.prettyJson()
{
"type" : "struct",
"fields" : [ {
"name" : "name",
"type" : {
"type" : "struct",
"fields" : [ {
"name" : "firstname",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "middlename",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "lastname",
"type" : "string",
"nullable" : true,
"metadata" : { }
} ]
},
"nullable" : true,
"metadata" : { }
}, {
"name" : "dob",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "gender",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "salary",
"type" : "integer",
"nullable" : true,
"metadata" : { }
} ]
}
val url = ClassLoader.getSystemResource("schema.json")
val schemaSource = Source.fromFile(url.getFile).getLines.mkString
val schemaFromJson = DataType.fromJson(schemaSource).asInstanceOf[StructType]
val df3 = spark.createDataFrame(
spark.sparkContext.parallelize(structureData),schemaFromJson)
df3.printSchema()
This prints the same output as the previous section. You can also, have a name, type, and flag for nullable in a comma-separated file and we can use these to create a StructType programmatically, I will leave this to you to explore.
Adding & Changing struct of the DataFrame
Using Spark SQL function struct(), we can change the struct of the existing DataFrame and add a new StructType to it. The below example demonstrates how to copy the columns from one structure to another and adding a new column.
val updatedDF = df4.withColumn("OtherInfo",
struct( col("id").as("identifier"),
col("gender").as("gender"),
col("salary").as("salary"),
when(col("salary").cast(IntegerType) < 2000,"Low")
.when(col("salary").cast(IntegerType) < 4000,"Medium")
.otherwise("High").alias("Salary_Grade")
)).drop("id","gender","salary")
updatedDF.printSchema()
updatedDF.show(false)
Here, it copies “gender
“, “salary
” and “id
” to the new struct “otherInfo
” and add’s a new column “Salary_Grade
“.
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- OtherInfo: struct (nullable = false)
| |-- identifier: string (nullable = true)
| |-- gender: string (nullable = true)
| |-- salary: integer (nullable = true)
| |-- Salary_Grade: string (nullable = false)
Using SQL ArrayType and MapType
SQL StructType also supports ArrayType and MapType to define the DataFrame columns for array and map collections respectively. On the below example, column “hobbies” defined as ArrayType(StringType) and “properties” defined as MapType(StringType,StringType) meaning both key and value as String.
val arrayStructureData = Seq(
Row(Row("James ","","Smith"),List("Cricket","Movies"),Map("hair"->"black","eye"->"brown")),
Row(Row("Michael ","Rose",""),List("Tennis"),Map("hair"->"brown","eye"->"black")),
Row(Row("Robert ","","Williams"),List("Cooking","Football"),Map("hair"->"red","eye"->"gray")),
Row(Row("Maria ","Anne","Jones"),null,Map("hair"->"blond","eye"->"red")),
Row(Row("Jen","Mary","Brown"),List("Blogging"),Map("white"->"black","eye"->"black"))
)
val arrayStructureSchema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("hobbies", ArrayType(StringType))
.add("properties", MapType(StringType,StringType))
val df5 = spark.createDataFrame(
spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
df5.printSchema()
df5.show()
Outputs the below schema and the DataFrame data. Note that field “Hobbies
” is array type and “properties
” is map type.
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- hobbies: array (nullable = true)
| |-- element: string (containsNull = true)
|-- properties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
+---------------------+-------------------+------------------------------+
|name |hobbies |properties |
+---------------------+-------------------+------------------------------+
|[James , , Smith] |[Cricket, Movies] |[hair -> black, eye -> brown] |
|[Michael , Rose, ] |[Tennis] |[hair -> brown, eye -> black] |
|[Robert , , Williams]|[Cooking, Football]|[hair -> red, eye -> gray] |
|[Maria , Anne, Jones]|null |[hair -> blond, eye -> red] |
|[Jen, Mary, Brown] |[Blogging] |[white -> black, eye -> black]|
+---------------------+-------------------+------------------------------+
Convert case class to Spark StructType
Spark SQL also provides Encoders to convert case class to StructType object. If you are using older versions of Spark, you can also transform the case class to the schema using the Scala hack. Both examples are present here.
case class Name(first:String,last:String,middle:String)
case class Employee(fullName:Name,age:Integer,gender:String)
import org.apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[Employee].dataType.asInstanceOf[StructType]
val encoderSchema = Encoders.product[Employee].schema
encoderSchema.printTreeString()
printTreeString() outputs the below schema.
root
|-- fullName: struct (nullable = true)
| |-- first: string (nullable = true)
| |-- last: string (nullable = true)
| |-- middle: string (nullable = true)
|-- age: integer (nullable = true)
|-- gender: string (nullable = true)
Creating StructType object struct from DDL String
Like loading structure from JSON string, we can also create it from DLL ( by using fromDDL()
static function on SQL StructType class StructType.fromDDL
). You can also generate DDL from a schema using toDDL()
. printTreeString() on struct object prints the schema similar to printSchema
function returns.
val ddlSchemaStr = "`fullName` STRUCT<`first`: STRING, `last`: STRING,
`middle`: STRING>,`age` INT,`gender` STRING"
val ddlSchema = StructType.fromDDL(ddlSchemaStr)
ddlSchema.printTreeString()
Checking if a field exists in a DataFrame
If you want to perform some checks on metadata of the DataFrame, for example, if a column or field exists in a DataFrame or data type of column; we can easily do this using several functions on SQL StructType and StructField.
println(df.schema.fieldNames.contains("firstname"))
println(df.schema.contains(StructField("firstname",StringType,true)))
This example returns “true” for both scenarios. And for the second one if you have IntegetType instead of StringType it returns false as the datatype for first name column is String, as it checks every property ins field. Similarly, you can also check if two schemas are equal and more.
The complete example explained here is available at GitHub project.
Conclusion:
In this article, you have learned the usage of SQL StructType, StructField and how to change the structure of the spark DataFrame at runtime, converting case class to the schema and using ArrayType, MapType.
your are just awesome, I’ve just started learning spark, the variety of examples that you have put together in one place is simply awesome. Thanks a lot.
Thanks, Satendra for wonderful words. please spread the word 🙂
Greatly appreciate your time and effort putting this tutorial on spark together. Really informative!
Thanks for your kind works Yaw.
heyyy , thank you very much dude for this effort …really appreciate that
How do you define a schema when few fields can be optional?
Hi – I keep getting an error when running:
schemaFromJson = DataType.fromJson(schemaSource).asInstanceOf[StructType]
Error message:
Exception in thread “main” java.lang.IllegalArgumentException: Failed to convert the JSON string ‘{“test”:”validate”}’ to a data type.
Tried extremely simple JSON strucutres too (as in the error message), still keep getting the error. Any thoguhts what could be the problem?
thank you for sharing a great full information and good explanation