• Post author:
  • Post category:PySpark
  • Post last modified:March 27, 2024
  • Reading time:9 mins read
You are currently viewing PySpark row_number() – Add Column with Row Number

How do you add a new column with row number (using row_number) to the PySpark DataFrame? pyspark.sql.window module provides a set of functions like row_number(), rank(), and dense_rank() to add a column with row number. The row_number() assigns unique sequential numbers to rows within specified partitions and orderings, rank() provides a ranking with tied values receiving the same rank and leaving gaps, while dense_rank() provides a ranking without gaps for tied values within partitions.

Advertisements

In this article, I will use row_number() function to generate a sequential row number and add it as a new column to the PySpark DataFrame.

Key Points

  • You can use row_number() with or without partitions.
  • Window functions often involve partitioning the data based on one or more columns. Partitioning divides the data into groups; window functions are applied independently within each partition.
  • The data can be ordered within each partition based on one or more columns. Ordering defines the sequence of rows that window functions operate on.
  • Window functions require a window specifying the data’s partitioning and ordering. This is created using the Window class from pyspark.sql.window.

PySpark row_number() Syntax & Usage

Following is the syntax of row_number() function that is used to generate a row number, which is an incremental sequential number.


# Syntax
pyspark.sql.functions.row_number()

Before proceeding to the example, We will create a DataFrame with some sample data, as shown below.


# Imports
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create a Spark session
spark = SparkSession.builder.appName("SparkByExamples").getOrCreate()

# Define the schema
schema = StructType([
    StructField("employee_name", StringType(), True),
    StructField("department", StringType(), True),
    StructField("salary", IntegerType(), True)
])

# Prepare the sample data
simple_data = [
    ("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
]

# Create a DataFrame
df = spark.createDataFrame(simple_data, schema=schema)

# Show the DataFrame
df.show()

Yields below output.

pyspark add column row number

Add Column with Row Number to DataFrame by Partition

You can use the row_number() function to add a new column with a row number as value to the PySpark DataFrame. The row_number() function assigns a unique numerical rank to each row within a specified window or partition of a DataFrame. Rows are ordered based on the condition specified, and the assigned numbers reflect the row’s position in the ordering sequence. It is used to perform analytics such as ranking, identifying the top and bottom rows of a DataFrame.

Before we apply row_number(), we need to partition the columns by using “partitionBy()” function. Partitioning allows to group similar data together. After partitioning we can order the partitioned data by applying orderBy() function. Here, we will do a partition on the “department” column and orderby on the “salary” column, and then we run the row_number() function with over() on partitioned data.


# Imports
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Applying partitionBy() and orderBy()
window_spec = Window.partitionBy("department").orderBy("salary")

# Add a new column "row_number" using row_number() over the specified window
result_df = df.withColumn("row_number", row_number().over(window_spec))

# Show the result
result_df.show()

Yields below output.


# Output
+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         2|
|       Robert|     Sales|  4100|         3|
|         Saif|     Sales|  4100|         4|
|      Michael|     Sales|  4600|         5|
+-------------+----------+------+----------+

In the above output, the rows are partitioned by the “department” column and ordered by “salary,”(Ascending by default) and a new column, “row_number,” is added with a sequence number starting from 1 for each partition.

Add Row Number to the DataFrame without Partition

row_number() function can also be applied without partitioning the coulmn. In this case row_number() function is applied to the DataFrame where rows are orderby by the “salary” column.

Below is an example.


# Imports
from pyspark.sql.functions import col
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Add a new column "row_number" using row_number() over the specified window
df_window = Window.orderBy(col("salary"))
result_df = df.withColumn("row_number", row_number().over(df_window))
result_df.show()

Yields below output.


# Output
+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|        Kumar| Marketing|  2000|         1|
|        James|     Sales|  3000|         2|
|        Maria|   Finance|  3000|         3|
|        James|     Sales|  3000|         4|
|         Jeff| Marketing|  3000|         5|
|        Scott|   Finance|  3300|         6|
|          Jen|   Finance|  3900|         7|
|       Robert|     Sales|  4100|         8|
|         Saif|     Sales|  4100|         9|
|      Michael|     Sales|  4600|        10|
+-------------+----------+------+----------+

In the above output new column “row_number” is added using the row_number() function over a window ordered by the “salary” column. The resulting DataFrame is displayed, showing the row numbers assigned based on the ascending order of the “salary” column.

Get Row Number using PySpark SQL

If you are coming from SQL backgroud, you can also use the SQL query to get a row number. In order to use SQL, first you need to create a temporary view from the DataFrame using createOrReplaceTempView(). A Temporary view in PySpark is similar to a real SQL table that contains rows and columns but the view is not materialized into files.


# Create SQL table
df.createOrReplaceTempView("employee_data")

# Define a window specification in PySpark SQL
window_spec_sql = "PARTITION BY department ORDER BY salary DESC"

# Use row_number() in PySpark SQL
result_sql_df = spark.sql("""
    SELECT 
        employee_name,
        department,
        salary,
        ROW_NUMBER() OVER ({} ) AS row_number
    FROM employee_data
""".format(window_spec_sql))
result_sql_df.show()

Conclusion

In this PySpark article, you have learned the row_number() function for getting unique row number to rows within specified partition, and ordering and adding them as new column to the DataFrame. Also provides detailed explanation of examples on how to apply row_number() with partition and without partition.

Keep Learning!!

Prabha

Prabha is an accomplished data engineer with a wealth of experience in architecting, developing, and optimizing data pipelines and infrastructure. With a strong foundation in software engineering and a deep understanding of data systems, Prabha excels in building scalable solutions that handle diverse and large datasets efficiently. At SparkbyExamples.com Prabha writes her experience in Spark, PySpark, Python and Pandas.