Similar to map()
PySpark mapPartitions()
is a narrow transformation operation that applies a function to each partition of the RDD, if you have a DataFrame, you need to convert to RDD in order to use it. mapPartitions()
is mainly used to initialize connections once for each partition instead of every row, this is the main difference between map() vs mapPartitions(). It is a narrow transformation as there will not be any data movement/shuffling between partitions to perform the function.
Key Points of PySpark MapPartitions():
- It is similar to map() operation where the output of mapPartitions() returns the same number of rows as in input RDD.
- It is used to improve the performance of the map() when there is a need to do heavy initializations like Database connection.
- mapPartitions() applies a heavy initialization to each partition of RDD instead of each element of RDD.
- It is a Narrow transformation operation
- PySpark DataFrame doesn’t have this operation hence you need to convert DataFrame to RDD to use mapPartitions()
1. Syntax of mapPartitions()
Following is the syntax of PySpark mapPartitions()
. It calls function f with argument as partition elements and performs the function and returns all elements of the partition. It also takes another optional argument preservesPartitioning
to preserve the partition.
RDD.mapPartitions(f, preservesPartitioning=False)
2. Usage of mapPartitions()
def f(partitionData):
#perform heavy initializations like Databse connections
for element in partitionData:
# perform operations for element in a partition
# return updated data
df.rdd.mapPartitions(f)
3. PySpark mapPartitions() Example
First let’s create a DataFrame with sample data and use this data to provide an example of mapPartitions().
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [('James','Smith','M',3000),
('Anna','Rose','F',4100),
('Robert','Williams','M',6200),
]
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
Now use the PySpark mapPartitions()
transformation to concatenate the firstname
, lastname
and calculate the bonus
as 10% of the value of salary
column.
# This function calls for each partition
def reformat(partitionData):
for row in partitionData:
yield [row.firstname+","+row.lastname,row.salary*10/100]
df2=df.rdd.mapPartitions(reformat).toDF(["name","bonus"])
df2.show()
Note that this is a simple example which doesn’t use the main benefit, you will actually get the advantage of mapPartitions() when you do heavy initializations like database connections for each partition otherwise it will behave similarly to the PySpark map() transformation.
Since map partitions return an RDD, you need to convert RDD back to DataFrame by providing column names.
+---------------+-----+
| name|bonus|
+---------------+-----+
| James,Smith|300.0|
| Anna,Rose|410.0|
|Robert,Williams|620.0|
+---------------+-----+
Below example uses for
with out yield
.
def reformat(partitionData):
updatedData = []
for row in partitionData:
name=row.firstname+","+row.lastname
bonus=row.salary*10/100
updatedData.append([name,bonus])
return iter(updatedData)
df2=df.rdd.mapPartitions(reformat).toDF(["name","bonus"])
df2.show()
4. Complete Example of mapPartitions()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [('James','Smith','M',3000),
('Anna','Rose','F',4100),
('Robert','Williams','M',6200),
]
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
#Example 1 mapPartitions()
def reformat(partitionData):
for row in partitionData:
yield [row.firstname+","+row.lastname,row.salary*10/100]
df2=df.rdd.mapPartitions(reformat).toDF(["name","bonus"])
df2.show()
#Example 2 mapPartitions()
def reformat2(partitionData):
updatedData = []
for row in partitionData:
name=row.firstname+","+row.lastname
bonus=row.salary*10/100
updatedData.append([name,bonus])
return iter(updatedData)
df2=df.rdd.mapPartitions(reformat).toDF(["name","bonus"])
df2.show())
You can also find the complete example @ PySpark GitHub Examples Project
5. Conclusion
mapPartitions()
is used to provide heavy initialization for each partition instead of applying to all elements this is the main difference between PySpark map() vs mapPartitions()
. similar to map(), this also returns the same number of elements but the number of columns could be different.
Happy Learning !!