PySpark provides map(), mapPartitions() to loop/iterate through rows in RDD/DataFrame to perform the complex transformations, and these two returns the same number of records as in the original DataFrame but the number of columns could be different (after add/update).
PySpark also provides foreach() & foreachPartitions() actions to loop/iterate through each Row in a DataFrame but these two returns nothing, In this article, I will explain how to use these methods to get DataFrame column values and process.
- Using map() to loop through DataFrame
- Using foreach() to loop through DataFrame
- Using pandas() to Iterate
- Collect Data As List and Loop Through in Python
PySpark Loop Through Rows in DataFrame Examples
In order to explain with examples, let’s create a DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [('James','Smith','M',30),('Anna','Rose','F',41),
('Robert','Williams','M',62),
]
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
| James| Smith| M| 30|
| Anna| Rose| F| 41|
| Robert|Williams| M| 62|
+---------+--------+------+------+
Mostly for simple computations, instead of iterating through using map() and foreach(), you should use either DataFrame select() or DataFrame withColumn() in conjunction with PySpark SQL functions.
from pyspark.sql.functions import concat_ws,col,lit
df.select(concat_ws(",",df.firstname,df.lastname).alias("name"), \
df.gender,lit(df.salary*2).alias("new_salary")).show()
+---------------+------+----------+
| name|gender|new_salary|
+---------------+------+----------+
| James,Smith| M| 60|
| Anna,Rose| F| 82|
|Robert,Williams| M| 124|
+---------------+------+----------+
Below I have map() example to achieve same output as above.
Using map() to Loop Through Rows in DataFrame
PySpark map() Transformation is used to loop/iterate through the PySpark DataFrame/RDD by applying the transformation function (lambda) on every element (Rows and Columns) of RDD/DataFrame. PySpark doesn’t have a map() in DataFrame instead it’s in RDD hence we need to convert DataFrame to RDD first and then use the map(). It returns an RDD and you should Convert RDD to PySpark DataFrame if needed.
If you have a heavy initialization use PySpark mapPartitions() transformation instead of map(), as with mapPartitions() heavy initialization executes only once for each partition instead of every record.
# Refering columns by index.
rdd=df.rdd.map(lambda x:
(x[0]+","+x[1],x[2],x[3]*2)
)
df2=rdd.toDF(["name","gender","new_salary"])
df2.show()
The above example iterates through every row in a DataFrame by applying transformations to the data, since I need a DataFrame back, I have converted the result of RDD to DataFrame with new column names. Note that here I have used index to get the column values, alternatively, you can also refer to the DataFrame column names while iterating.
# Referring Column Names
rdd2=df.rdd.map(lambda x:
(x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2)
)
Another alternative
# Referring Column Names
rdd2=df.rdd.map(lambda x:
(x.firstname+","+x.lastname,x.gender,x.salary*2)
)
You can also create a custom function to perform an operation. Below func1()
function executes for every DataFrame row from the lambda function.
# By Calling function
def func1(x):
firstName=x.firstname
lastName=x.lastName
name=firstName+","+lastName
gender=x.gender.lower()
salary=x.salary*2
return (name,gender,salary)
rdd2=df.rdd.map(lambda x: func1(x))
Using foreach() to Loop Through Rows in DataFrame
Similar to map(), foreach() also applied to every row of DataFrame, the difference being foreach() is an action and it returns nothing. Below are some examples to iterate through DataFrame using for each.
# Foreach example
def f(x): print(x)
df.foreach(f)
# Another example
df.foreach(lambda x:
print("Data ==>"+x["firstname"]+","+x["lastname"]+","+x["gender"]+","+str(x["salary"]*2))
)
Using pandas() to Iterate
If you have a small dataset, you can also Convert PySpark DataFrame to Pandas and use pandas to iterate through. Use spark.sql.execution.arrow.enabled
config to enable Apache Arrow with Spark. Apache Spark uses Apache Arrow which is an in-memory columnar format to transfer the data between Python and JVM.
# Using pandas
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandasDF = df.toPandas()
for index, row in pandasDF.iterrows():
print(row['firstname'], row['gender'])
Collect Data As List and Loop Through
You can also Collect the PySpark DataFrame to Driver and iterate through Python, you can also use toLocalIterator()
.
# Collect the data to Python List
dataCollect = df.collect()
for row in dataCollect:
print(row['firstname'] + "," +row['lastname'])
#Using toLocalIterator()
dataCollect=df.rdd.toLocalIterator()
for row in dataCollect:
print(row['firstname'] + "," +row['lastname'])
Conclusion
In this article, you have learned iterating/loop through Rows of PySpark DataFrame could be done using map(), foreach(), converting to Pandas, and finally converting DataFrame to Python List. If you want to do simile computations, use either select or withColumn().
Happy Learning !!
Related Articles
- Dynamic way of doing ETL through Pyspark
- PySpark Shell Command Usage with Examples
- PySpark Replace Column Values in DataFrame
- PySpark Replace Empty Value With None/null on DataFrame
- PySpark – Find Count of null, None, NaN Values
- PySpark mapPartitions() Examples
- PySpark partitionBy() – Write to Disk Example
I love you guys
Thanks for the love and support.