The row_number() is a window function in Spark SQL that assigns a row number (sequential integer number) to each row in the result DataFrame. This function is used with Window.partitionBy()
which partitions the data into windows frames and orderBy() clause to sort the rows in each partition.
Preparing a Data set
Let’s create a DataFrame to work with
import spark.implicits._
val simpleData = Seq(("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
val df = simpleData.toDF("employee_name", "department", "salary")
df.show()
Yields below output
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
| James| Sales| 3000|
| Michael| Sales| 4600|
| Robert| Sales| 4100|
| Maria| Finance| 3000|
| James| Sales| 3000|
| Scott| Finance| 3300|
| Jen| Finance| 3900|
| Jeff| Marketing| 3000|
| Kumar| Marketing| 2000|
| Saif| Sales| 4100|
+-------------+----------+------+
Add Row Number to DataFrame
Spark SQL provides row_number()
as part of the window functions group, first, we need to create a partition and order by as row_number() function needs it.
Here, we will do partition on the “department” column and order by on the “salary” column and then we run row_number() function to assign a sequential row number to each partition.
//row_number
val windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_number",row_number.over(windowSpec))
.show()
Yields below output
+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
| James| Sales| 3000| 1|
| James| Sales| 3000| 2|
| Robert| Sales| 4100| 3|
| Saif| Sales| 4100| 4|
| Michael| Sales| 4600| 5|
| Maria| Finance| 3000| 1|
| Scott| Finance| 3300| 2|
| Jen| Finance| 3900| 3|
| Kumar| Marketing| 2000| 1|
| Jeff| Marketing| 3000| 2|
+-------------+----------+------+----------+
As you see above output, the dataset has been partitioned by “department” column and order by “salary” and added a new column “row_number” with sequence number starting from 1 for each partition.
Source code | Add Row number to DataFrame
package com.sparkbyexamples.spark.dataframe.functions.window
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
object RowNumber extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val simpleData = Seq(("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
val df = simpleData.toDF("employee_name", "department", "salary")
df.show()
//row_number
val windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_number",row_number.over(windowSpec))
.show()
}
This complete example is available at GitHub project
Conclusion
In this article, you have learned how to use Spark SQL row_number() window function to add a sequential row number for each partition window on Dataframe.
Thanks for your post! Is there a way of adding row number to rows in a DataFrame, without reordering the rows using orderBy?
Does it possible to avoid any additional shuffle, if call row number partition by spark partition id?