Spark Window functions are used to calculate results such as the rank, row number e.t.c over a range of input rows and these are available to you by importing org.apache.spark.sql.functions._
, this article explains the concept of window functions, it’s usage, syntax and finally how to use them with Spark SQL and Spark’s DataFrame API. These come in handy when we need to make aggregate operations in a specific window frame on DataFrame columns.
When possible, try to leverage the standard libraries as they are a little bit safer in compile-time, handle null, and perform better when compared to UDFs. If your application is critical on performance, try to avoid using custom UDF at all costs as these are not guaranteed on performance.
1. Spark Window Functions
Spark Window functions operate on a group of rows (like frame, partition) and return a single value for every input row. Spark SQL supports three kinds of window functions:
The below table defines Ranking and Analytic functions and for aggregate functions, we can use any existing aggregate functions as a window function.
To perform an operation on a group first, we need to partition the data using Window.partitionBy()
, and for row number and rank function we need to additionally order by on partition data using orderBy
clause.
Click on each link to know more about these functions along with the Scala examples.
WINDOW FUNCTIONS USAGE & SYNTAX | PYSPARK WINDOW FUNCTIONS DESCRIPTION |
---|---|
row_number(): Column | Returns a sequential number starting from 1 within a window partition |
rank(): Column | Returns the rank of rows within a window partition, with gaps. |
percent_rank(): Column | Returns the percentile rank of rows within a window partition. |
dense_rank(): Column | Returns the rank of rows within a window partition without any gaps. Where as Rank() returns rank with gaps. |
ntile(n: Int): Column | Returns the ntile id in a window partition |
cume_dist(): Column | Returns the cumulative distribution of values within a window partition |
lag(e: Column, offset: Int): Column lag(columnName: String, offset: Int): Column lag(columnName: String, offset: Int, defaultValue: Any): Column | returns the value that is `offset` rows before the current row, and `null` if there is less than `offset` rows before the current row. |
Before we start with an example, first let’s create a DataFrame to work with.
import spark.implicits._
val simpleData = Seq(("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
val df = simpleData.toDF("employee_name", "department", "salary")
df.show()
Yields below output
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
| James| Sales| 3000|
| Michael| Sales| 4600|
| Robert| Sales| 4100|
| Maria| Finance| 3000|
| James| Sales| 3000|
| Scott| Finance| 3300|
| Jen| Finance| 3900|
| Jeff| Marketing| 3000|
| Kumar| Marketing| 2000|
| Saif| Sales| 4100|
+-------------+----------+------+
2. Spark Window Ranking functions
2.1 row_number Window Function
row_number()
window function is used to give the sequential row number starting from 1 to the result of each window partition.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
//row_number
val windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_number",row_number.over(windowSpec))
.show()
Yields below output.
+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
| James| Sales| 3000| 1|
| James| Sales| 3000| 2|
| Robert| Sales| 4100| 3|
| Saif| Sales| 4100| 4|
| Michael| Sales| 4600| 5|
| Maria| Finance| 3000| 1|
| Scott| Finance| 3300| 2|
| Jen| Finance| 3900| 3|
| Kumar| Marketing| 2000| 1|
| Jeff| Marketing| 3000| 2|
+-------------+----------+------+----------+
2.2 rank Window Function
rank()
window function is used to provide a rank to the result within a window partition. This function leaves gaps in rank when there are ties.
import org.apache.spark.sql.functions._
//rank
df.withColumn("rank",rank().over(windowSpec))
.show()
Yields below output.
+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
| James| Sales| 3000| 1|
| James| Sales| 3000| 1|
| Robert| Sales| 4100| 3|
| Saif| Sales| 4100| 3|
| Michael| Sales| 4600| 5|
| Maria| Finance| 3000| 1|
| Scott| Finance| 3300| 2|
| Jen| Finance| 3900| 3|
| Kumar| Marketing| 2000| 1|
| Jeff| Marketing| 3000| 2|
+-------------+----------+------+----+
This is the same as the RANK function in SQL.
2.3 dense_rank Window Function
dense_rank()
window function is used to get the result with rank of rows within a window partition without any gaps. This is similar to rank()
function difference being rank function leaves gaps in rank when there are ties.
import org.apache.spark.sql.functions._
//dens_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec))
.show()
Yields below output.
+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
| James| Sales| 3000| 1|
| James| Sales| 3000| 1|
| Robert| Sales| 4100| 2|
| Saif| Sales| 4100| 2|
| Michael| Sales| 4600| 3|
| Maria| Finance| 3000| 1|
| Scott| Finance| 3300| 2|
| Jen| Finance| 3900| 3|
| Kumar| Marketing| 2000| 1|
| Jeff| Marketing| 3000| 2|
+-------------+----------+------+----------+
This is the same as the DENSE_RANK function in SQL.
2.4 percent_rank Window Function
import org.apache.spark.sql.functions._
//percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec))
.show()
Yields below output.
+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
| James| Sales| 3000| 0.0|
| James| Sales| 3000| 0.0|
| Robert| Sales| 4100| 0.5|
| Saif| Sales| 4100| 0.5|
| Michael| Sales| 4600| 1.0|
| Maria| Finance| 3000| 0.0|
| Scott| Finance| 3300| 0.5|
| Jen| Finance| 3900| 1.0|
| Kumar| Marketing| 2000| 0.0|
| Jeff| Marketing| 3000| 1.0|
+-------------+----------+------+------------+
This is the same as the PERCENT_RANK function in SQL.
2.5 ntile Window Function
ntile()
window function returns the relative rank of result rows within a window partition. In below example we have used 2 as an argument to ntile hence it returns ranking between 2 values (1 and 2)
//ntile
df.withColumn("ntile",ntile(2).over(windowSpec))
.show()
Yields below output.
+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
| James| Sales| 3000| 1|
| James| Sales| 3000| 1|
| Robert| Sales| 4100| 1|
| Saif| Sales| 4100| 2|
| Michael| Sales| 4600| 2|
| Maria| Finance| 3000| 1|
| Scott| Finance| 3300| 1|
| Jen| Finance| 3900| 2|
| Kumar| Marketing| 2000| 1|
| Jeff| Marketing| 3000| 2|
+-------------+----------+------+-----+
This is the same as the NTILE function in SQL.
3. Spark Window Analytic functions
3.1 cume_dist Window Function
cume_dist()
window function is used to get the cumulative distribution of values within a window partition.
This is the same as the DENSE_RANK function in SQL.
//cume_dist
df.withColumn("cume_dist",cume_dist().over(windowSpec))
.show()
+-------------+----------+------+------------------+
|employee_name|department|salary| cume_dist|
+-------------+----------+------+------------------+
| James| Sales| 3000| 0.4|
| James| Sales| 3000| 0.4|
| Robert| Sales| 4100| 0.8|
| Saif| Sales| 4100| 0.8|
| Michael| Sales| 4600| 1.0|
| Maria| Finance| 3000|0.3333333333333333|
| Scott| Finance| 3300|0.6666666666666666|
| Jen| Finance| 3900| 1.0|
| Kumar| Marketing| 2000| 0.5|
| Jeff| Marketing| 3000| 1.0|
+-------------+----------+------+------------------+
3.2 lag Window Function
This is the same as the LAG function in SQL.
//lag
df.withColumn("lag",lag("salary",2).over(windowSpec))
.show()
+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
| James| Sales| 3000|null|
| James| Sales| 3000|null|
| Robert| Sales| 4100|3000|
| Saif| Sales| 4100|3000|
| Michael| Sales| 4600|4100|
| Maria| Finance| 3000|null|
| Scott| Finance| 3300|null|
| Jen| Finance| 3900|3000|
| Kumar| Marketing| 2000|null|
| Jeff| Marketing| 3000|null|
+-------------+----------+------+----+
3.3 lead Window Function
This is the same as the LEAD function in SQL.
//lead
df.withColumn("lead",lead("salary",2).over(windowSpec))
.show()
+-------------+----------+------+----+
|employee_name|department|salary|lead|
+-------------+----------+------+----+
| James| Sales| 3000|4100|
| James| Sales| 3000|4100|
| Robert| Sales| 4100|4600|
| Saif| Sales| 4100|null|
| Michael| Sales| 4600|null|
| Maria| Finance| 3000|3900|
| Scott| Finance| 3300|null|
| Jen| Finance| 3900|null|
| Kumar| Marketing| 2000|null|
| Jeff| Marketing| 3000|null|
+-------------+----------+------+----+
4. Spark Window Aggregate Functions
In this section, I will explain how to calculate sum, min, max for each department using Spark SQL Aggregate window functions and WindowSpec
. When working with Aggregate functions, we don’t need to use order by clause.
val windowSpec = Window.partitionBy("department").orderBy("salary")
val windowSpecAgg = Window.partitionBy("department")
val aggDF = df.withColumn("row",row_number.over(windowSpec))
.withColumn("avg", avg(col("salary")).over(windowSpecAgg))
.withColumn("sum", sum(col("salary")).over(windowSpecAgg))
.withColumn("min", min(col("salary")).over(windowSpecAgg))
.withColumn("max", max(col("salary")).over(windowSpecAgg))
.where(col("row")===1).select("department","avg","sum","min","max")
.show()
This yields below output
+----------+------+-----+----+----+
|department| avg| sum| min| max|
+----------+------+-----+----+----+
| Sales|3760.0|18800|3000|4600|
| Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
+----------+------+-----+----+----+
Please refer for more Aggregate Spark Functions
5. Source Code of Window Functions Example
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
object WindowFunctions extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val simpleData = Seq(("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
val df = simpleData.toDF("employee_name", "department", "salary")
df.show()
//row_number
val windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_number",row_number.over(windowSpec))
.show()
//rank
df.withColumn("rank",rank().over(windowSpec))
.show()
//dens_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec))
.show()
//percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec))
.show()
//ntile
df.withColumn("ntile",ntile(2).over(windowSpec))
.show()
//cume_dist
df.withColumn("cume_dist",cume_dist().over(windowSpec))
.show()
//lag
df.withColumn("lag",lag("salary",2).over(windowSpec))
.show()
//lead
df.withColumn("lead",lead("salary",2).over(windowSpec))
.show()
//Aggregate Functions
val windowSpecAgg = Window.partitionBy("department")
val aggDF = df.withColumn("row",row_number.over(windowSpec))
.withColumn("avg", avg(col("salary")).over(windowSpecAgg))
.withColumn("sum", sum(col("salary")).over(windowSpecAgg))
.withColumn("min", min(col("salary")).over(windowSpecAgg))
.withColumn("max", max(col("salary")).over(windowSpecAgg))
.where(col("row")===1).select("department","avg","sum","min","max")
.show()
}
The complete source code is available at GitHub for reference.
6. Conclusion
In this tutorial, you have learned what are Spark SQL Window functions their syntax and how to use them with aggregate function along with several examples in Scala.
Related Articles
- Spark Find Maximum Row per Group
- What is Apache Spark and Why It Is Ultimate for Working with Big Data
- Spark SQL – Add row number to DataFrame
- Spark DataFrame Select First Row of Each Group?
- Apache Spark Installation on Windows
- Difference in DENSE_RANK and ROW_NUMBER in Spark
- Spark Shell Command Usage with Examples
7. References
I would recommend reading Window Functions Introduction and SQL Window Functions API blogs to further understand Windows functions. Also, refer to SQL Window functions to know window functions from native SQL.
Happy Learning !!
df.withColumn(“dense_rank”,dense_rank().over() )
.show()
what will we happen to above statement
Hi Sowjanya. Thanks for your kind comments. I am able to re-execute the program mentioned in this article without issue. Could you please send me the snippet you are trying to execute and the version of Spark you are using?
Hi Sowjanya. Thanks for your kind comments. I am able to re-execute the program mentioned in this article without issue. Could you please send me the snippet you are trying to execute and the version of Spark you are using?
your explanation is very easy to understand and crispy, it’s helping to learn spark in an effective way. Thank you so much for putting your efforts.
I have a question, when I’m trying to do aggregate functions without order by it’s throwing the below error. Did exactly like mentioned in the blog.
” org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;”
Thanks for taking your time and answering. Appreciate it.
Try to store a new dataframe as: val df_row = df.withColumn(“row”,row_number.over(windowSpec))
Use this new dataframe for aggregate functions: df_row.withColumn(“avg”, avg(col(“salary”)).over(windowSpecAgg)).withColumn(“sum”, sum(col(“salary”)).over(windowSpecAgg)).withColumn(“min”, min(col(“salary”)).over(windowSpecAgg)).withColumn(“max”, max(col(“salary”)).over(windowSpecAgg)).where(col(“row”)===1).select(“department”,”avg”,”sum”,”min”,”max”).show()
First of al thanks to you blogs and here in this page in last example you have taken column of row which is equal to 1 but where you defined row column ?