Transpose a Spark DataFrame means converting its columns into rows and rows into columns, you can easily achieve this by using pivoting. More specifically, it involves rotating a DataFrame by 90 degrees, such that the values in its columns become values in its rows, and the values in its rows become values in its columns.
In this article, we shall discuss How to transpose a DataFrame rows to columns using different methods with examples. Let’s create a DataFrame and then use this to transpose its rows to columns.
// Import
import org.apache.spark.sql.SparkSession
// Create SparkSession
val spark:SparkSession = SparkSession.builder()
.master("local[1]").appName("SparkByExamples.com")
.getOrCreate()
// Create a sample DataFrame
import spark.implicits._
val data = Seq((1, "A", 10), (1, "B", 20), (2, "A", 30), (2, "B", 40))
val df = data.toDF("id", "col1", "col2")
df.show()
This example yields the below output.
1. Spark Transpose DataFrame using Pivot()
In Spark, transposing a DataFrame can be achieved by pivoting the DataFrame. The pivot() function in Spark can be used to achieve this. Here’s an example code snippet in Scala that shows how to transpose a DataFrame.
// Pivot the DataFrame to transpose it
val transposedDf = df.groupBy("id").pivot("col1").agg(first("col2"))
// Show the transposed DataFrame
transposedDf.show()
This example yields the below output.
In this example, we start by creating a sample DataFrame df
with three columns: id
, col1
, and col2
. We then use the groupBy() function to group the DataFrame by the id
column and the pivot()
function to pivot the DataFrame on the col1
column to transpose the Spark DataFrame. The agg()
function is used to aggregate the col2
column using the first()
function. Finally, we show the transposed DataFrame using the show() function.
Note that the pivot()
function requires an aggregation function to be specified. In this example, we used the first()
function as the aggregation function, which returns the first value in each group. You can use other aggregation functions such as sum()
, avg()
, max()
, or min()
depending on your use case.
2. Transpose using collect_list()
and explode()
// Group by id and pivot the DataFrame
val transposedDf = df.groupBy("id")
.agg(collect_list(struct($"col1", $"col2")).as("cols"))
.select($"id", explode($"cols").as(Seq("col1", "col2")))
.groupBy("col1")
.pivot("id")
.agg(first("col2"))
// Show the transposed DataFrame
transposedDf.show()
//Result
+---+---+---+
| id| A| B|
+---+---+---+
| 1| 10| 20|
| 2| 30| 40|
+---+---+---+
In this example, we first group the DataFrame by the id
column and aggregate the col1
and col2
columns into a struct using collect_list(). We then select the id
, col1
, and col2
columns using explode(). Finally, we group the DataFrame by col1
, pivot it on the id
column, and aggregate the col2
column using the first()
function to transpose the Spark DataFrame.
3. Transpose using map()
and reduce()
// Import
import org.apache.spark.sql.Row
// Get distinct values of col1 and create a map of id to col2 values
val distinctCol1 = df.select("col1").distinct().collect().map(_.getString(0))
val idToCol2Map = df.rdd.map {
case Row(id: Int, col1: String, col2: Int) => (id, (col1, col2))
}.reduceByKey((a, b) => if (a._1 == b._1) a else throw new RuntimeException("Values not distinct"))
.mapValues(_._2)
.collectAsMap()
// Create a new DataFrame by iterating over the map and transposing the values
val transposedDf = distinctCol1.foldLeft(df.sparkSession.emptyDataFrame) {
case (accDf, col1Val) =>
val col = lit(col1Val)
val cols = idToCol2Map.map {
case (id, col2) => lit(id) as id.toString, when(col($"col1") === col, col2).otherwise(lit(null)) as col1Val
}.toSeq
accDf.union(df.select(cols:_*))
}
// Show the transposed DataFrame
transposedDf.show()
//Result
+---+---+---+
| id| A| B|
+---+---+---+
| 1| 10| 20|
| 2| 30| 40|
+---+---+---+
In this example, we first get the distinct values of col1
and create a map of id
to col2
values using reduceByKey(). We then iterate over the distinct values of col1
and the map to create a new DataFrame, transposing the values using a combination of lit(), when()
, and union().
4. Conclusion
In conclusion, transposing a Spark Scala DataFrame rows to columns involves rotating DataFrame by 90 degrees, such that its columns become rows and its rows become columns. Transposing a DataFrame can be useful in scenarios where we want to switch the orientation of the data, for example, to make it easier to aggregate or analyze in a different way.
There are various ways to transpose a DataFrame in Spark Scala, including using built-in functions such as pivot()
and groupBy()
, or by manually iterating over the data and creating a new DataFrame using custom logic. The method chosen will depend on the specific use case, the size of the data, and the performance requirements.
It is important to note that transposing a DataFrame can result in a different schema and data structure, and it may require additional processing to make it compatible with downstream operations. It is also important to consider the memory and processing requirements of transposing a large DataFrame, as it may involve shuffling or aggregating large amounts of data.
Related Articles
- How to Pivot and Unpivot a Spark Data Frame
- Spark mapValues()
- Spark groupByKey() vs reduceByKey()
- Spark RDD reduce() function example
- Spark explode array and map columns to rows
- Use length function in substring in Spark
- Get Other Columns when using GroupBy or Select All Columns with the GroupBy?
- Spark cannot resolve given input columns