You are currently viewing Spark Extract Values from a Row Object

How to get or extract values from a Row object in Spark with Scala? In Apache Spark, DataFrames are the distributed collections of data, organized into rows and columns. You can extract values from a row using various methods, depending on the specific context and requirements. In this article, we shall discuss a few common approaches in Spark to extract value from a row object.

Advertisements

Let us create a sample Spark DataFrame and try extracting values from a Row using various approaches.

1. Create Sample Dataframe

Let’s say you have a sales DataFrame with the following schema:


// Import
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

// Create SparkSession
val spark = SparkSession.builder()
  .appName("SparkByExamplesDemo")
  .master("local")
  .getOrCreate()

// Initialize Schema
val schema = StructType(
  Array(
    StructField("order_id", IntegerType, nullable = false),
    StructField("product", StringType, nullable = false),
    StructField("quantity", IntegerType, nullable = false),
    StructField("price", DoubleType, nullable = false)
  )
)

// Create a Seq of Data Elements
val salesData = Seq(
  Row(1, "Apple", 5, 1.2),
  Row(2, "Banana", 3, 0.75),
  Row(3, "Orange", 7, 0.9)
)

//Create DataFrame using the Schema specified
val salesDF = spark.createDataFrame(spark.sparkContext.parallelize(salesData), schema)
salesDF.show()

Here, spark refers to the SparkSession, and parallelize is used to parallelize the salesData sequence and distribute it across the Spark cluster. The resulting DataFrame salesDF contains the sales data with the specified schema. This salesDF DataFrame represents sales data with columns “order_id“, “product“, “quantity“, and “price“.


+--------+-------+--------+-----+
|order_id|product|quantity|price|
+--------+-------+--------+-----+
|       1|  Apple|       5|  1.2|
|       2| Banana|       3| 0.75|
|       3| Orange|       7|  0.9|
+--------+-------+--------+-----+

Now, let’s extract values from a row in different ways

2. Accessing Row values by column name

You can use the getAs() method from the Spark Row object to get the specific value from the row. This takes the column name as an argument and returns the value.


// Get head record
val row: Row = salesDF.head()

// Extract values from row
val orderId: Int = row.getAs[Int]("order_id")
val product: String = row.getAs[String]("product")
val quantity: Int = row.getAs[Int]("quantity")
val price: Double = row.getAs[Double]("price")

println(s"Order ID: $orderId, Product: $product, Quantity: $quantity, Price: $price")

In this approach of Spark Extracting Values from a Row,

  • We retrieve the first row using the head method.
  • Then use the getAs() method to retrieve the values from the row based on the column names specified in the schema. The method takes the column name and the desired data type as type parameters. Here, we extract the values with the corresponding data types: Int, String, and Double.
  • The values are then assigned to variables for further processing or printing.

The resultant of the above approach of accessing values by column name to extract values from the Row looks as:

Spark Extract Values from a Row

3. Accessing values by column index

Alternatively, you can also extract the value from the Spark row object by using the getInt(), getString(), getDouble() e.t.c


// Get head record
val row: Row = salesDF.head()

// Extract values from object
val orderId: Int = row.getInt(0) // Assuming "order_id" is the first column
val product: String = row.getString(1) // Assuming "product" is the second column
val quantity: Int = row.getInt(2) // Assuming "quantity" is the third column
val price: Double = row.getDouble(3) // Assuming "price" is the fourth column

println(s"Order ID: $orderId, Product: $product, Quantity: $quantity, Price: $price")

In this approach of Spark Extracting Values from a Row,

  • We access the values from the row using the getInt, getString, and getDouble methods, providing the column index as the argument.
  • The column index starts from 0, so we specify the index of the desired column to extract the corresponding value.

The resultant of the above approach of Accessing values by column index to extract values from the Row looks as:

Access values by column index

4. Converting a Row to a case class

You can also convert the Row to a case class and access the values from the case class by referring to their names.


// Define case class Sale
case class Sale(orderId: Int, product: String, quantity: Int, price: Double)

// Get head record
val row: Row = salesDF.head()

// Create a case class by accessing elements from row
val sale: Sale = Sale(row.getAs[Int]("order_id"), row.getAs[String]("product"), row.getAs[Int]("quantity"), row.getAs[Double]("price"))

// Access fields from case class
val orderId: Int = sale.orderId
val product: String = sale.product
val quantity: Int = sale.quantity
val price: Double = sale.price

println(s"Order ID: $orderId, Product: $product, Quantity: $quantity, Price: $price")

In this approach of Spark Extracting Values from a Row,

  • we define a case class Sale that matches the structure of the row.
  • We then create an instance of the case class, providing the extracted values from the row using the getAs method.
  • Finally, we access the values directly from the case class instance for further processing or printing.

The resultant of the above approach of Converting a Row to a case class to extract values from the Row looks as:

Spark extract Values from a Row

Overall, these approaches demonstrate different ways in Spark to extract values from a Row, allowing you to choose the one that best fits your requirements and coding style.

5. Conclusion

To summarize, Spark Extracting Values from a Row can be done in multiple ways based on your specific requirements. Here are the key points to remember:

  1. Accessing values by column name: You can use the getAs method or dot notation (row.columnName) to extract values from a row based on the column name. This approach provides readability and clarity to your code.
  2. Accessing values by column index: You can use methods like getInt, getString, getDouble, etc., to extract values from a row based on the column index. Column indexes start from 0, so you need to provide the appropriate index for each column.
  3. Converting a Row to a case class: If you have a predefined case class that matches the structure of the row, you can convert the row into an instance of that case class. This approach provides type safety and allows you to access the values directly from the case class instance.

Remember to handle cases where the DataFrame may be empty or have multiple rows. In such scenarios, you need to adjust the code accordingly to handle the desired row(s).

Overall, the approach you choose for extracting values from a row depends on your preference, the structure of your DataFrame, and the readability and maintainability goals of your code. Below is a complete example.


// Import
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

// Create SparkSession
val spark = SparkSession.builder()
  .appName("SparkByExamplesDemo")
  .master("local")
  .getOrCreate()

//Initialize Schema
val schema = StructType(
  Array(
    StructField("order_id", IntegerType, nullable = false),
    StructField("product", StringType, nullable = false),
    StructField("quantity", IntegerType, nullable = false),
    StructField("price", DoubleType, nullable = false)
  )
)

//Create a Seq of Data Elements
val salesData = Seq(
  Row(1, "Apple", 5, 1.2),
  Row(2, "Banana", 3, 0.75),
  Row(3, "Orange", 7, 0.9)
)

//Create DataFrame using the Schema specified
val salesDF = spark.createDataFrame(spark.sparkContext.parallelize(salesData), schema)
salesDF.show()

//Accessing values by column name
val row: Row = salesDF.head()

val orderId: Int = row.getAs[Int]("order_id")
val product: String = row.getAs[String]("product")
val quantity: Int = row.getAs[Int]("quantity")
val price: Double = row.getAs[Double]("price")

println(s"Order ID: $orderId, Product: $product, Quantity: $quantity, Price: $price")

//Accessing values by column index
val row: Row = salesDF.head()

val orderId: Int = row.getInt(0) // Assuming "order_id" is the first column
val product: String = row.getString(1) // Assuming "product" is the second column
val quantity: Int = row.getInt(2) // Assuming "quantity" is the third column
val price: Double = row.getDouble(3) // Assuming "price" is the fourth column

println(s"Order ID: $orderId, Product: $product, Quantity: $quantity, Price: $price")


//Converting a Row to a case class
case class Sale(orderId: Int, product: String, quantity: Int, price: Double)

val row: Row = salesDF.head()

val sale: Sale = Sale(row.getAs[Int]("order_id"), row.getAs[String]("product"), row.getAs[Int]("quantity"), row.getAs[Double]("price"))

val orderId: Int = sale.orderId
val product: String = sale.product
val quantity: Int = sale.quantity
val price: Double = sale.price

println(s"Order ID: $orderId, Product: $product, Quantity: $quantity, Price: $price")

Related Articles

rimmalapudi

Data Engineer. I write about BigData Architecture, tools and techniques that are used to build Bigdata pipelines and other generic blogs.