PySpark mapPartitions() Examples

Similar to map() PySpark mapPartitions() is a narrow transformation operation that applies a function to each partition of the RDD, if you have a DataFrame, you need to convert to RDD in order to use it. mapPartitions() is mainly used to initialize connections once for each partition instead of every row, this is the main difference between map() vs mapPartitions(). It is a narrow transformation as there will not be any data movement/shuffling between partitions to perform the function.

Key Points of PySpark MapPartitions():

  • It is similar to map() operation where the output of mapPartitions() returns the same number of rows as in input RDD.
  • It is used to improve the performance of the map() when there is a need to do heavy initializations like Database connection.
  • mapPartitions() applies a heavy initialization to each partition of RDD instead of each element of RDD.
  • It is a Narrow transformation operation
  • PySpark DataFrame doesn’t have this operation hence you need to convert DataFrame to RDD to use mapPartitions()

1. Syntax of mapPartitions()

Following is the syntax of PySpark mapPartitions(). It calls function f with argument as partition elements and performs the function and returns all elements of the partition. It also takes another optional argument preservesPartitioning to preserve the partition.


RDD.mapPartitions(f, preservesPartitioning=False)

2. Usage of mapPartitions()


def f(partitionData):
  #perform heavy initializations like Databse connections
  for element in partitionData:
    # perform operations for element in a partition
  # return updated data
df.rdd.mapPartitions(f)

3. PySpark mapPartitions() Example

First let’s create a DataFrame with sample data and use this data to provide an example of mapPartitions().


from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [('James','Smith','M',3000),
  ('Anna','Rose','F',4100),
  ('Robert','Williams','M',6200), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

Now use the PySpark mapPartitions() transformation to concatenate the firstname, lastname and calculate the bonus as 10% of the value of salary column.


# This function calls for each partition
def reformat(partitionData):
    for row in partitionData:
        yield [row.firstname+","+row.lastname,row.salary*10/100]

df2=df.rdd.mapPartitions(reformat).toDF(["name","bonus"])
df2.show()

Note that this is a simple example which doesn’t use the main benefit, you will actually get the advantage of mapPartitions() when you do heavy initializations like database connections for each partition otherwise it will behave similarly to the PySpark map() transformation.

Since map partitions return an RDD, you need to convert RDD back to DataFrame by providing column names.


+---------------+-----+
|           name|bonus|
+---------------+-----+
|    James,Smith|300.0|
|      Anna,Rose|410.0|
|Robert,Williams|620.0|
+---------------+-----+

Below example uses for with out yield.


def reformat(partitionData):
  updatedData = []
  for row in partitionData:
    name=row.firstname+","+row.lastname
    bonus=row.salary*10/100
    updatedData.append([name,bonus])
  return iter(updatedData)

df2=df.rdd.mapPartitions(reformat).toDF(["name","bonus"])
df2.show()

4. Complete Example of mapPartitions()


from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [('James','Smith','M',3000),
  ('Anna','Rose','F',4100),
  ('Robert','Williams','M',6200), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

#Example 1 mapPartitions()
def reformat(partitionData):
    for row in partitionData:
        yield [row.firstname+","+row.lastname,row.salary*10/100]
df2=df.rdd.mapPartitions(reformat).toDF(["name","bonus"])
df2.show()

#Example 2 mapPartitions()
def reformat2(partitionData):
  updatedData = []
  for row in partitionData:
    name=row.firstname+","+row.lastname
    bonus=row.salary*10/100
    updatedData.append([name,bonus])
  return iter(updatedData)

df2=df.rdd.mapPartitions(reformat).toDF(["name","bonus"])
df2.show())

You can also find the complete example @ PySpark GitHub Examples Project

5. Conclusion

mapPartitions() is used to provide heavy initialization for each partition instead of applying to all elements this is the main difference between PySpark map() vs mapPartitions(). similar to map(), this also returns the same number of elements but the number of columns could be different.

Happy Learning !!

PySpark mapPartitions Example

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply

You are currently viewing PySpark mapPartitions() Examples