Spark Window Functions with Examples

Spark Window functions are used to calculate results such as the rank, row number e.t.c over a range of input rows and these are available to you by importing org.apache.spark.sql.functions._, this article explains the concept of window functions, it’s usage, syntax and finally how to use them with Spark SQL and Spark’s DataFrame API. These come in handy when we need to make aggregate operations in a specific window frame on DataFrame columns.

When possible, try to leverage the standard libraries as they are a little bit safer in compile-time, handle null, and perform better when compared to UDFs. If your application is critical on performance, try to avoid using custom UDF at all costs as these are not guaranteed on performance.

1. Spark Window Functions

Spark Window functions operate on a group of rows (like frame, partition) and return a single value for every input row. Spark SQL supports three kinds of window functions:

Spark Window Functions
Spark Window Functions

The below table defines Ranking and Analytic functions and for aggregate functions, we can use any existing aggregate functions as a window function.

To perform an operation on a group first, we need to partition the data using Window.partitionBy() , and for row number and rank function we need to additionally order by on partition data using orderBy clause.

Click on each link to know more about these functions along with the Scala examples.

WINDOW FUNCTIONS USAGE & SYNTAXPYSPARK WINDOW FUNCTIONS DESCRIPTION
row_number(): ColumnReturns a sequential number starting from 1 within a window partition
rank(): ColumnReturns the rank of rows within a window partition, with gaps.
percent_rank(): ColumnReturns the percentile rank of rows within a window partition.
dense_rank(): ColumnReturns the rank of rows within a window partition without any gaps. Where as Rank() returns rank with gaps.
ntile(n: Int): ColumnReturns the ntile id in a window partition
cume_dist(): ColumnReturns the cumulative distribution of values within a window partition
lag(e: Column, offset: Int): Column
lag(columnName: String, offset: Int): Column
lag(columnName: String, offset: Int, defaultValue: Any): Column
returns the value that is `offset` rows before the current row, and `null` if there is less than `offset` rows before the current row.
Spark Window Functions

Before we start with an example, first let’s create a DataFrame to work with.


  import spark.implicits._

  val simpleData = Seq(("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  )
  val df = simpleData.toDF("employee_name", "department", "salary")
  df.show()

Yields below output


+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+

2. Spark Window Ranking functions

2.1 row_number Window Function

row_number() window function is used to give the sequential row number starting from 1 to the result of each window partition.


import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

//row_number
val windowSpec  = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_number",row_number.over(windowSpec))
  .show()

Yields below output.


+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         2|
|       Robert|     Sales|  4100|         3|
|         Saif|     Sales|  4100|         4|
|      Michael|     Sales|  4600|         5|
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
+-------------+----------+------+----------+

2.2 rank Window Function

rank() window function is used to provide a rank to the result within a window partition. This function leaves gaps in rank when there are ties.


import org.apache.spark.sql.functions._
//rank
df.withColumn("rank",rank().over(windowSpec))
  .show()

Yields below output.


+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
+-------------+----------+------+----+

This is the same as the RANK function in SQL.

2.3 dense_rank Window Function

dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps. This is similar to rank() function difference being rank function leaves gaps in rank when there are ties.


import org.apache.spark.sql.functions._
//dens_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec))
  .show()

Yields below output.


+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         2|
|      Michael|     Sales|  4600|         3|
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
+-------------+----------+------+----------+

This is the same as the DENSE_RANK function in SQL.

2.4 percent_rank Window Function


import org.apache.spark.sql.functions._
//percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec))
  .show()

Yields below output.


+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  3000|         0.0|
|       Robert|     Sales|  4100|         0.5|
|         Saif|     Sales|  4100|         0.5|
|      Michael|     Sales|  4600|         1.0|
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Jeff| Marketing|  3000|         1.0|
+-------------+----------+------+------------+

This is the same as the PERCENT_RANK function in SQL.

2.5 ntile Window Function

ntile() window function returns the relative rank of result rows within a window partition. In below example we have used 2 as an argument to ntile hence it returns ranking between 2 values (1 and 2)


//ntile
df.withColumn("ntile",ntile(2).over(windowSpec))
  .show()

Yields below output.


+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
+-------------+----------+------+-----+

This is the same as the NTILE function in SQL.

3. Spark Window Analytic functions

3.1 cume_dist Window Function

cume_dist() window function is used to get the cumulative distribution of values within a window partition.

This is the same as the DENSE_RANK function in SQL.


//cume_dist
df.withColumn("cume_dist",cume_dist().over(windowSpec))
  .show()

+-------------+----------+------+------------------+
|employee_name|department|salary|         cume_dist|
+-------------+----------+------+------------------+
|        James|     Sales|  3000|               0.4|
|        James|     Sales|  3000|               0.4|
|       Robert|     Sales|  4100|               0.8|
|         Saif|     Sales|  4100|               0.8|
|      Michael|     Sales|  4600|               1.0|
|        Maria|   Finance|  3000|0.3333333333333333|
|        Scott|   Finance|  3300|0.6666666666666666|
|          Jen|   Finance|  3900|               1.0|
|        Kumar| Marketing|  2000|               0.5|
|         Jeff| Marketing|  3000|               1.0|
+-------------+----------+------+------------------+

3.2 lag Window Function

This is the same as the LAG function in SQL.


//lag
df.withColumn("lag",lag("salary",2).over(windowSpec))
   .show()

+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
|        James|     Sales|  3000|null|
|        James|     Sales|  3000|null|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sales|  4100|3000|
|      Michael|     Sales|  4600|4100|
|        Maria|   Finance|  3000|null|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|3000|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
+-------------+----------+------+----+

3.3 lead Window Function

This is the same as the LEAD function in SQL.


//lead
df.withColumn("lead",lead("salary",2).over(windowSpec))
  .show()

+-------------+----------+------+----+
|employee_name|department|salary|lead|
+-------------+----------+------+----+
|        James|     Sales|  3000|4100|
|        James|     Sales|  3000|4100|
|       Robert|     Sales|  4100|4600|
|         Saif|     Sales|  4100|null|
|      Michael|     Sales|  4600|null|
|        Maria|   Finance|  3000|3900|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|null|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
+-------------+----------+------+----+

4. Spark Window Aggregate Functions

In this section, I will explain how to calculate sum, min, max for each department using Spark SQL Aggregate window functions and WindowSpec. When working with Aggregate functions, we don’t need to use order by clause.


val windowSpec  = Window.partitionBy("department").orderBy("salary")
val windowSpecAgg  = Window.partitionBy("department")
val aggDF = df.withColumn("row",row_number.over(windowSpec))
    .withColumn("avg", avg(col("salary")).over(windowSpecAgg))
    .withColumn("sum", sum(col("salary")).over(windowSpecAgg))
    .withColumn("min", min(col("salary")).over(windowSpecAgg))
    .withColumn("max", max(col("salary")).over(windowSpecAgg))
    .where(col("row")===1).select("department","avg","sum","min","max")
    .show()

This yields below output


+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|     Sales|3760.0|18800|3000|4600|
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
+----------+------+-----+----+----+

Please refer for more Aggregate Spark Functions

5. Source Code of Window Functions Example


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

object WindowFunctions extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  import spark.implicits._

  val simpleData = Seq(("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  )
  val df = simpleData.toDF("employee_name", "department", "salary")
  df.show()

  //row_number
  val windowSpec  = Window.partitionBy("department").orderBy("salary")
  df.withColumn("row_number",row_number.over(windowSpec))
    .show()

  //rank
  df.withColumn("rank",rank().over(windowSpec))
    .show()


  //dens_rank
  df.withColumn("dense_rank",dense_rank().over(windowSpec))
    .show()

  //percent_rank
  df.withColumn("percent_rank",percent_rank().over(windowSpec))
    .show()

  //ntile
  df.withColumn("ntile",ntile(2).over(windowSpec))
    .show()

  //cume_dist
  df.withColumn("cume_dist",cume_dist().over(windowSpec))
    .show()

  //lag
  df.withColumn("lag",lag("salary",2).over(windowSpec))
    .show()

  //lead
  df.withColumn("lead",lead("salary",2).over(windowSpec))
    .show()

  //Aggregate Functions
  val windowSpecAgg  = Window.partitionBy("department")
  val aggDF = df.withColumn("row",row_number.over(windowSpec))
    .withColumn("avg", avg(col("salary")).over(windowSpecAgg))
    .withColumn("sum", sum(col("salary")).over(windowSpecAgg))
    .withColumn("min", min(col("salary")).over(windowSpecAgg))
    .withColumn("max", max(col("salary")).over(windowSpecAgg))
    .where(col("row")===1).select("department","avg","sum","min","max")
    .show()
}

The complete source code is available at GitHub for reference.

6. Conclusion

In this tutorial, you have learned what are Spark SQL Window functions their syntax and how to use them with aggregate function along with several examples in Scala.

7. References

I would recommend reading Window Functions Introduction and SQL Window Functions API blogs to further understand Windows functions. Also, refer to SQL Window functions to know window functions from native SQL.

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply

This Post Has 7 Comments

  1. Anonymous

    df.withColumn(“dense_rank”,dense_rank().over() )
    .show()
    what will we happen to above statement

  2. Sowjanya Kilari

    your explanation is very easy to understand and crispy, it’s helping to learn spark in an effective way. Thank you so much for putting your efforts.
    I have a question, when I’m trying to do aggregate functions without order by it’s throwing the below error. Did exactly like mentioned in the blog.
    ” org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;”

    1. NNK

      Hi Sowjanya. Thanks for your kind comments. I am able to re-execute the program mentioned in this article without issue. Could you please send me the snippet you are trying to execute and the version of Spark you are using?

  3. hcyudsb

    First of al thanks to you blogs and here in this page in last example you have taken column of row which is equal to 1 but where you defined row column ?

    1. newbiespark

      Try to store a new dataframe as: val df_row = df.withColumn(“row”,row_number.over(windowSpec))
      Use this new dataframe for aggregate functions: df_row.withColumn(“avg”, avg(col(“salary”)).over(windowSpecAgg)).withColumn(“sum”, sum(col(“salary”)).over(windowSpecAgg)).withColumn(“min”, min(col(“salary”)).over(windowSpecAgg)).withColumn(“max”, max(col(“salary”)).over(windowSpecAgg)).where(col(“row”)===1).select(“department”,”avg”,”sum”,”min”,”max”).show()

      1. NNK

        Thanks for taking your time and answering. Appreciate it.