You are currently viewing Spark Transpose Rows to Columns of DataFrame?

Transpose a Spark DataFrame means converting its columns into rows and rows into columns, you can easily achieve this by using pivoting. More specifically, it involves rotating a DataFrame by 90 degrees, such that the values in its columns become values in its rows, and the values in its rows become values in its columns.

Advertisements

In this article, we shall discuss How to transpose a DataFrame rows to columns using different methods with examples. Let’s create a DataFrame and then use this to transpose its rows to columns.


// Import
import org.apache.spark.sql.SparkSession

// Create SparkSession
val spark:SparkSession = SparkSession.builder()
    .master("local[1]").appName("SparkByExamples.com")
    .getOrCreate()

// Create a sample DataFrame
import spark.implicits._
val data = Seq((1, "A", 10), (1, "B", 20), (2, "A", 30), (2, "B", 40))
val df = data.toDF("id", "col1", "col2")
df.show()

This example yields the below output.

spark transpose dataframe

1. Spark Transpose DataFrame using Pivot()

In Spark, transposing a DataFrame can be achieved by pivoting the DataFrame. The pivot() function in Spark can be used to achieve this. Here’s an example code snippet in Scala that shows how to transpose a DataFrame.


// Pivot the DataFrame to transpose it
val transposedDf = df.groupBy("id").pivot("col1").agg(first("col2"))

// Show the transposed DataFrame
transposedDf.show()

This example yields the below output.

spark transpose rows to columns

In this example, we start by creating a sample DataFrame df with three columns: id, col1, and col2. We then use the groupBy() function to group the DataFrame by the id column and the pivot() function to pivot the DataFrame on the col1 column to transpose the Spark DataFrame. The agg() function is used to aggregate the col2 column using the first() function. Finally, we show the transposed DataFrame using the show() function.

Note that the pivot() function requires an aggregation function to be specified. In this example, we used the first() function as the aggregation function, which returns the first value in each group. You can use other aggregation functions such as sum(), avg(), max(), or min() depending on your use case.

2. Transpose using collect_list() and explode()


// Group by id and pivot the DataFrame
val transposedDf = df.groupBy("id")
  .agg(collect_list(struct($"col1", $"col2")).as("cols"))
  .select($"id", explode($"cols").as(Seq("col1", "col2")))
  .groupBy("col1")
  .pivot("id")
  .agg(first("col2"))

// Show the transposed DataFrame
transposedDf.show()

//Result
+---+---+---+
| id|  A|  B|
+---+---+---+
|  1| 10| 20|
|  2| 30| 40|
+---+---+---+

In this example, we first group the DataFrame by the id column and aggregate the col1 and col2 columns into a struct using collect_list(). We then select the id, col1, and col2 columns using explode(). Finally, we group the DataFrame by col1, pivot it on the id column, and aggregate the col2 column using the first() function to transpose the Spark DataFrame.

3. Transpose using map() and reduce()


// Import
import org.apache.spark.sql.Row

// Get distinct values of col1 and create a map of id to col2 values
val distinctCol1 = df.select("col1").distinct().collect().map(_.getString(0))
val idToCol2Map = df.rdd.map {
  case Row(id: Int, col1: String, col2: Int) => (id, (col1, col2))
}.reduceByKey((a, b) => if (a._1 == b._1) a else throw new RuntimeException("Values not distinct"))
  .mapValues(_._2)
  .collectAsMap()

// Create a new DataFrame by iterating over the map and transposing the values
val transposedDf = distinctCol1.foldLeft(df.sparkSession.emptyDataFrame) {
  case (accDf, col1Val) =>
    val col = lit(col1Val)
    val cols = idToCol2Map.map {
      case (id, col2) => lit(id) as id.toString, when(col($"col1") === col, col2).otherwise(lit(null)) as col1Val
    }.toSeq
    accDf.union(df.select(cols:_*))
}

// Show the transposed DataFrame
transposedDf.show()

//Result
+---+---+---+
| id|  A|  B|
+---+---+---+
|  1| 10| 20|
|  2| 30| 40|
+---+---+---+

In this example, we first get the distinct values of col1 and create a map of id to col2 values using reduceByKey(). We then iterate over the distinct values of col1 and the map to create a new DataFrame, transposing the values using a combination of lit(), when(), and union().

4. Conclusion

In conclusion, transposing a Spark Scala DataFrame rows to columns involves rotating DataFrame by 90 degrees, such that its columns become rows and its rows become columns. Transposing a DataFrame can be useful in scenarios where we want to switch the orientation of the data, for example, to make it easier to aggregate or analyze in a different way.

There are various ways to transpose a DataFrame in Spark Scala, including using built-in functions such as pivot() and groupBy(), or by manually iterating over the data and creating a new DataFrame using custom logic. The method chosen will depend on the specific use case, the size of the data, and the performance requirements.

It is important to note that transposing a DataFrame can result in a different schema and data structure, and it may require additional processing to make it compatible with downstream operations. It is also important to consider the memory and processing requirements of transposing a large DataFrame, as it may involve shuffling or aggregating large amounts of data.

Related Articles

rimmalapudi

Data Engineer. I write about BigData Architecture, tools and techniques that are used to build Bigdata pipelines and other generic blogs.