PySpark provides map(), mapPartitions() to loop/iterate through rows in RDD/DataFrame to perform the complex transformations, and these two return the same number of rows/records as in the original DataFrame but, the number of columns could be different (after transformation, for example, add/update). PySpark DataFrames are designed for distributed data processing, so direct row-wise iteration should be avoided when working with large datasets. Instead, consider using Spark’s built-in transformations and actions to process data more efficiently.
PySpark also provides foreach() & foreachPartitions() actions to loop/iterate through each Row in a DataFrame but these two return nothing. In this article, I will explain how to use these methods to get DataFrame column values.
- Using map() to loop through DataFrame
- Using foreach() to loop through DataFrame
- Using pandas() to Iterate
- Collect Data As a List and Loop Through in Python
PySpark Loop Through Rows in DataFrame Examples
In order to explain with examples, let’s create a DataFrame
# Create SparkSession and sample data
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [('James','Smith','M',30),('Anna','Rose','F',41),
('Robert','Williams','M',62),
]
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
# Output
+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
| James| Smith| M| 30|
| Anna| Rose| F| 41|
| Robert|Williams| M| 62|
+---------+--------+------+------+
Mostly for simple computations, instead of iterating through using map() or foreach(), you should use either DataFrame select() or DataFrame withColumn() in conjunction with PySpark SQL functions. Below is an example of using select().
# Using select()
from pyspark.sql.functions import concat_ws,col,lit
df.select(concat_ws(",",df.firstname,df.lastname).alias("name"), \
df.gender,lit(df.salary*2).alias("new_salary")).show()
# Output
+---------------+------+----------+
| name|gender|new_salary|
+---------------+------+----------+
| James,Smith| M| 60|
| Anna,Rose| F| 82|
|Robert,Williams| M| 124|
+---------------+------+----------+
Below I have a map() example to achieve the same output as above.
Using map() to Loop Through Rows in DataFrame
PySpark map() Transformation is used to loop/iterate through the PySpark DataFrame/RDD by applying the transformation function (lambda) on every element (Rows and Columns) of RDD/DataFrame. PySpark doesn’t have a map() in DataFrame instead it’s in RDD hence we need to convert DataFrame to RDD first and then use the map(). It returns an RDD and you should Convert RDD to PySpark DataFrame if needed.
If you have a heavy initialization use PySpark mapPartitions() transformation instead of map(), as with mapPartitions() heavy initialization executes only once for each partition instead of every record.
# Refering columns by index.
rdd=df.rdd.map(lambda x:
(x[0]+","+x[1],x[2],x[3]*2)
)
df2=rdd.toDF(["name","gender","new_salary"])
df2.show()
The above example iterates through every row in a DataFrame by applying transformations to the data, since I need a DataFrame back, I have converted the result of RDD to a DataFrame with new column names. Note that here I have used the index to get the column values, alternatively, you can also refer to the DataFrame column names while iterating.
# Referring Column Names
rdd2=df.rdd.map(lambda x:
(x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2)
)
Another alternative
# Referring Column Names
rdd2=df.rdd.map(lambda x:
(x.firstname+","+x.lastname,x.gender,x.salary*2)
)
You can also create a custom function to perform an operation. The below func1()
function executes for every DataFrame row from the lambda function.
# By Calling function
def func1(x):
firstName=x.firstname
lastName=x.lastName
name=firstName+","+lastName
gender=x.gender.lower()
salary=x.salary*2
return (name,gender,salary)
rdd2=df.rdd.map(lambda x: func1(x))
Using foreach() to Loop Through Rows in DataFrame
Similar to map(), foreach() also applied to every row of DataFrame, the difference being foreach() is an action and it returns nothing. Below are some examples to iterate through DataFrame using for each.
# Foreach example
def f(x): print(x)
df.foreach(f)
# Another example
df.foreach(lambda x:
print("Data ==>"+x["firstname"]+","+x["lastname"]+","+x["gender"]+","+str(x["salary"]*2))
)
Using pandas() to Iterate
If you have a small dataset, you can also Convert PySpark DataFrame to Pandas and use pandas to iterate through. Use spark.sql.execution.arrow.enabled
config to enable Apache Arrow with Spark. Apache Spark uses Apache Arrow which is an in-memory columnar format to transfer the data between Python and JVM.
# Using pandas
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandasDF = df.toPandas()
for index, row in pandasDF.iterrows():
print(row['firstname'], row['gender'])
Collect Data As List and Loop Through
You can also Collect the PySpark DataFrame to Driver and iterate through Python using toLocalIterator()
.The toLocalIterator()
method is used to iterate through the partitions of a DataFrame locally on the driver node. This can be useful when you have a large DataFrame, and you want to process the data locally on the driver node without bringing the entire DataFrame to the driver. It returns an iterator that goes through the partitions of the DataFrame. This means that each iteration of the loop processes a partition of the DataFrame locally on the driver. This is beneficial for scenarios where the DataFrame is too large to fit into the driver’s memory, and you want to avoid the overhead of transferring the entire DataFrame to the driver.
# Collect the data to Python List
dataCollect = df.collect()
for row in dataCollect:
print(row['firstname'] + "," +row['lastname'])
#Using toLocalIterator()
dataCollect=df.rdd.toLocalIterator()
for row in dataCollect:
print(row['firstname'] + "," +row['lastname'])
Frequently Asked Questions
There are several ways to iterate through rows of a DataFrame in PySpark. We can use methods like collect()
, foreach()
, toLocalIterator()
, or convert the DataFrame to an RDD and use map()
.
collect()
and toLocalIterator()
? collect() brings all the data to the driver, which might be inefficient for large datasets. On the other hand, toLocalIterator() processes partitions locally on the driver node, making it more suitable for larger datasets.
In order to achieve this, First we need to convert the DataFrame to RDD using rdd = df.rdd. And apply map() transformation to iterate through the data.
toLocalIterator()
? Use toLocalIterator()
when you want to iterate through partitions locally on the driver and your processing logic can be done independently for each partition.
Conclusion
In this article, you have learned iterating/looping through Rows of PySpark DataFrame could be done using map(), foreach(), converting to Pandas, and finally converting DataFrame to Python List. If you want to do simple computations, use either select or withColumn().
Happy Learning !!
Related Articles
- PySpark Shell Command Usage with Examples
- PySpark Replace Column Values in DataFrame
- PySpark Replace Empty Value With None/null on DataFrame
- PySpark – Find Count of null, None, NaN Values
- PySpark mapPartitions() Examples
- PySpark partitionBy() – Write to Disk Example
Thanks for the love and support.
I love you guys