• Post author:
  • Post category:PySpark
  • Post last modified:March 27, 2024
  • Reading time:13 mins read
You are currently viewing PySpark – Loop/Iterate Through Rows in DataFrame

PySpark provides map(), mapPartitions() to loop/iterate through rows in RDD/DataFrame to perform the complex transformations, and these two return the same number of rows/records as in the original DataFrame but, the number of columns could be different (after transformation, for example, add/update). PySpark DataFrames are designed for distributed data processing, so direct row-wise iteration should be avoided when working with large datasets. Instead, consider using Spark’s built-in transformations and actions to process data more efficiently.

PySpark also provides foreach() & foreachPartitions() actions to loop/iterate through each Row in a DataFrame but these two return nothing. In this article, I will explain how to use these methods to get DataFrame column values.

PySpark Loop Through Rows in DataFrame Examples

In order to explain with examples, let’s create a DataFrame


# Create SparkSession and sample data
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [('James','Smith','M',30),('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

# Output
+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|    30|
|     Anna|    Rose|     F|    41|
|   Robert|Williams|     M|    62|
+---------+--------+------+------+

Mostly for simple computations, instead of iterating through using map() or foreach(), you should use either DataFrame select() or DataFrame withColumn() in conjunction with PySpark SQL functions. Below is an example of using select().


# Using select()
from pyspark.sql.functions import concat_ws,col,lit
df.select(concat_ws(",",df.firstname,df.lastname).alias("name"), \
          df.gender,lit(df.salary*2).alias("new_salary")).show()

# Output
+---------------+------+----------+
|           name|gender|new_salary|
+---------------+------+----------+
|    James,Smith|     M|        60|
|      Anna,Rose|     F|        82|
|Robert,Williams|     M|       124|
+---------------+------+----------+

Below I have a map() example to achieve the same output as above.

Using map() to Loop Through Rows in DataFrame

PySpark map() Transformation is used to loop/iterate through the PySpark DataFrame/RDD by applying the transformation function (lambda) on every element (Rows and Columns) of RDD/DataFrame. PySpark doesn’t have a map() in DataFrame instead it’s in RDD hence we need to convert DataFrame to RDD first and then use the map(). It returns an RDD and you should Convert RDD to PySpark DataFrame if needed.

If you have a heavy initialization use PySpark mapPartitions() transformation instead of map(), as with mapPartitions() heavy initialization executes only once for each partition instead of every record.


# Refering columns by index.
rdd=df.rdd.map(lambda x: 
    (x[0]+","+x[1],x[2],x[3]*2)
    )  
df2=rdd.toDF(["name","gender","new_salary"])
df2.show()

The above example iterates through every row in a DataFrame by applying transformations to the data, since I need a DataFrame back, I have converted the result of RDD to a DataFrame with new column names. Note that here I have used the index to get the column values, alternatively, you can also refer to the DataFrame column names while iterating.


# Referring Column Names
rdd2=df.rdd.map(lambda x: 
    (x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2)
    ) 

Another alternative


# Referring Column Names
rdd2=df.rdd.map(lambda x: 
    (x.firstname+","+x.lastname,x.gender,x.salary*2)
    ) 

You can also create a custom function to perform an operation. The below func1() function executes for every DataFrame row from the lambda function.


# By Calling function
def func1(x):
    firstName=x.firstname
    lastName=x.lastName
    name=firstName+","+lastName
    gender=x.gender.lower()
    salary=x.salary*2
    return (name,gender,salary)

rdd2=df.rdd.map(lambda x: func1(x))

Using foreach() to Loop Through Rows in DataFrame

Similar to map(), foreach() also applied to every row of DataFrame, the difference being foreach() is an action and it returns nothing. Below are some examples to iterate through DataFrame using for each.


# Foreach example
def f(x): print(x)
df.foreach(f)

# Another example
df.foreach(lambda x: 
    print("Data ==>"+x["firstname"]+","+x["lastname"]+","+x["gender"]+","+str(x["salary"]*2))
    ) 

Using pandas() to Iterate

If you have a small dataset, you can also Convert PySpark DataFrame to Pandas and use pandas to iterate through. Use spark.sql.execution.arrow.enabled config to enable Apache Arrow with Spark. Apache Spark uses Apache Arrow which is an in-memory columnar format to transfer the data between Python and JVM.


# Using pandas
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandasDF = df.toPandas()
for index, row in pandasDF.iterrows():
    print(row['firstname'], row['gender'])

Collect Data As List and Loop Through

You can also Collect the PySpark DataFrame to Driver and iterate through Python using toLocalIterator().The toLocalIterator() method is used to iterate through the partitions of a DataFrame locally on the driver node. This can be useful when you have a large DataFrame, and you want to process the data locally on the driver node without bringing the entire DataFrame to the driver. It returns an iterator that goes through the partitions of the DataFrame. This means that each iteration of the loop processes a partition of the DataFrame locally on the driver. This is beneficial for scenarios where the DataFrame is too large to fit into the driver’s memory, and you want to avoid the overhead of transferring the entire DataFrame to the driver.


# Collect the data to Python List
dataCollect = df.collect()
for row in dataCollect:
    print(row['firstname'] + "," +row['lastname'])

#Using toLocalIterator()
dataCollect=df.rdd.toLocalIterator()
for row in dataCollect:
    print(row['firstname'] + "," +row['lastname'])

Frequently Asked Questions

What are the different ways to iterate the rows of a PySpark DataFrame?

There are several ways to iterate through rows of a DataFrame in PySpark. We can use methods like collect(), foreach(), toLocalIterator(), or convert the DataFrame to an RDD and use map().

What is the difference between collect() and toLocalIterator()?

collect() brings all the data to the driver, which might be inefficient for large datasets. On the other hand, toLocalIterator() processes partitions locally on the driver node, making it more suitable for larger datasets.

Is there a way to iterate through rows without bringing the data to the driver?

In order to achieve this, First we need to convert the DataFrame to RDD using rdd = df.rdd. And apply map() transformation to iterate through the data.

When should I use toLocalIterator()?

Use toLocalIterator() when you want to iterate through partitions locally on the driver and your processing logic can be done independently for each partition.

Conclusion

In this article, you have learned iterating/looping through Rows of PySpark DataFrame could be done using map(), foreach(), converting to Pandas, and finally converting DataFrame to Python List. If you want to do simple computations, use either select or withColumn().

Happy Learning !!

References

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

This Post Has 2 Comments

  1. NNK

    Thanks for the love and support.

  2. João

    I love you guys

Comments are closed.