Sometimes you would be required to convert a DataFrame Row into a Scala case class in Spark, you can achieve this by using the spark implicit module or by row index.
In this article, let’s discuss what is a case class in scala, and how we can convert a row of DataFrame into a case class and its use case in detail.
1. Quick Examples
#Below are quick examples for converting Row or DataFrame into case class.
//Converting Row object directly into case class
//Create ROW object for our demo
import org.apache.spark.sql.Row
val row1 = Row("Spark", "By", "Examples")
//Define case class
case class fullName(firstName: String, MiddleName: String, LastName: String)
//Convert Row to case class
val cc1 = fullName(row1(0).toString, row1(1).toString, row1(2).toString)
//2. Converting DataFrame into Array of Case class using spark Implicit
//Create a spark DataFrame
val df = Seq((1,"spark"), (2,"By"), (3,"Examples")).toDF("id", "name")
// Define case class and Conversion of DataFrame into case class using spark implicits
import spark.implicits._
case class sparkId(id: String, name: String)
val dfCaseClass = df.as[sparkId]
// Extracting values from Array of case class of sparkId
val ids = dfCaseClass.collect.map(x => x.id)
val names = dfCaseClass.collect.map(x => x.name)
1. Definition
Before knowing the implementation, I always suggest knowing what is a DataFrame Row, and Dataset in scala. Let’s go through those definitions.
1.1. What is a DataFrame?
As we know Spark DataFrame is a distributed collection of tabular data organized into the combination of Rows and Columns with metadata. In simple terms, DataFrame is a combination of Rows with Schema or a Dataset organized into named columns. Since spark 2.0.0, DataFrame is a mere type alias for Dataset[Row]
.
See org.apache.spark.package.scala
A row is a generic object type of Row which stores an ordered collection of fields that can be accessed by an Index. A row is part of the spark SQL package.
import org.apache.spark.sql.Row
val row1 = Row("Spark", "By", "Examples")
println(s"Row field at index o is ${row1(0)}")
In the above example, We have created a variable row1 of type Row with field values as “spark”, “By”, and “Examples” and tried to access these field values using Indexing.
1.3. What is a Case Class?
case classes in scala are like regular classes but with additional features like they are useful for modeling immutable data and for pattern matching. parameters of the case class can be the field values of Row.
Syntax:
// syntax fo case class
case class caseClassName(field1: dataType, field2: dataType,....)
Example:
//define case class
case class fullName(firstName: String, MiddleName: String, LastName: String)
//initialize Case Class with mapping fields from ROW
val cc1 = fullName(row1(0).toString, row1(1).toString, row1(2).toString)
//extact values from Case Class and assigning to a variable
val firstName = cc1.firstName
val middleName = cc1.MiddleName
val lastName = cc1.LastName
In the above example, we tried to create a case class and later assigned its field values from Row1 which we created earlier, and finally, extracted those field values back from Case Class into variables.
Note: Case class with field value is also known as Dataset.
2. Use-Case
In the above case class definition, we have clearly seen its creation. First, we have a Row created and then we created a Case class. Later we assigned the values of the fields of the case class from Row. Up next we extracted those fields back again from the case class and assigned them to a variable. We may get a doubt like why can’t we directly assign a field from ROW type using the index extraction method to a variable.
When we extract value from the ROW object using the index, its dataType would be of “ANY”.
Here in the above screenshot, we are manually force-casting the field value to string type. So instead of manual efforts, it’s always better to a have pre-defined case class that holds the schema of the fields that we need and assigns its values using the Row index extraction method as shown above.
3. Convert Row to Case Class techniques
Now let us try to implement how we can convert the Row into a case class using different techniques.
3.1. Using Row Index
Same as we discussed in case class creation, we use this method only when the input is Row and size of the Row is small and all the Row are of the same schema then we have the leverage index of a Row
3.2 Using spark implicit
Spark implicit provides a service for converting Scala objects into a Dataset
, DataFrame
, Columns
or supporting such conversions through Encoders.
//Create a spark DataFrame
val df = Seq((1,"spark"), (2,"By"), (3,"Examples")).toDF("id", "name")
//Define case class and Conversion of DataFrame df into array of case class sparkId using spark implicits module
import spark.implicits._
case class sparkId(id: String, name: String)
val dfCaseClass = df.as[sparkId]
//Extracting values from Array of case class of sparkId
val ids = dfCaseClass.collect.map(x => x.id)
val names = dfCaseClass.collect.map(x => x.name)
In the above example, we tried to convert the ROW of a data frame into a case class using the spark implicit conversion technique. First, we created a dataFrame and defined a case class. As Spark implicit helps to convert dataFrame/Dataset/RDD directly into case class we have mapped dataframe into case class directly. Later from the Array of Case class we extracted its field value and assigned it to variables.
4. Conclusion
Conversion of a ROW object or a ROW from DataFrame into Case class is as easy as the above methods and Case Class is more advanced than ROW as it can hold ROW field values with schema enforced and helps in processing the records smoothly into further processing.
Related Articles
- Spark Create DataFrame with Examples
- Difference in DENSE_RANK and ROW_NUMBER in Spark
- Spark ArrayType Column on DataFrame & SQL
- Spark Convert case class to Schema
- Spark SQL “case when” and “when otherwise”
- Spark Shell Command Usage with Examples
- What is Apache Spark and Why It Is Ultimate for Working with Big Data
- Spark Merge Two DataFrames with Different Columns or Schema
- Testing Spark locally with EmbeddedKafka: Streamlining Spark Streaming Tests
- Spark Kryoserializer buffer max
- Spark with SQL Server – Read and Write Table
- reduceByKey vs groupByKey vs aggregateByKey vs combineByKey in Spark
- Reduce Key-Value Pair into Key-list Pair
- Spark Extract Values from a Row Object