PySpark Repartition() vs Coalesce()

  • Post author:
  • Post category:PySpark
  • Post last modified:December 16, 2022
  • Reading time:11 mins read

Let’s see the difference between PySpark repartition() vs coalesce(), repartition() is used to increase or decrease the RDD/DataFrame partitions whereas the PySpark coalesce() is used to only decrease the number of partitions in an efficient way.

In this article, you will learn the difference between PySpark repartition vs coalesce with examples.

One important point to note is, PySpark repartition() and coalesce() are very expensive operations as they shuffle the data across many partitions hence try to minimize using these as much as possible.

1. PySpark RDD Repartition() vs Coalesce()

In RDD, you can create parallelism at the time of the creation of an RDD using parallelize(), textFile() and wholeTextFiles().


# Create spark session with local[5]
rdd = spark.sparkContext.parallelize(range(0,20))
print("From local[5] : "+str(rdd.getNumPartitions()))

# Use parallelize with 6 partitions
rdd1 = spark.sparkContext.parallelize(range(0,25), 6)
print("parallelize : "+str(rdd1.getNumPartitions()))

rddFromFile = spark.sparkContext.textFile("src/main/resources/test.txt",10)
print("TextFile : "+str(rddFromFile.getNumPartitions()))

The above example yields the below output.


# Output:
From local[5] : 5
Parallelize : 6
TextFile : 10

spark.sparkContext.parallelize(Range(0,20),6) distributes RDD into 6 partitions and the data is distributed as below.


rdd1.saveAsTextFile("/tmp/partition")

#Writes 6 part files, one for each partition
Partition 1 : 0 1 2
Partition 2 : 3 4 5
Partition 3 : 6 7 8 9
Partition 4 : 10 11 12
Partition 5 : 13 14 15
Partition 6 : 16 17 18 19

1.1 RDD repartition()

repartition() method is used to increase or decrease the partitions. The below example decreases the partitions from 10 to 4 by moving data from all partitions.


# Using repartition
rdd2 = rdd1.repartition(4)
print("Repartition size : "+str(rdd2.getNumPartitions()))
rdd2.saveAsTextFile("/tmp/re-partition")

This yields output Repartition size : 4 and the repartition re-distributes the data(as shown below) from all partitions which is a full shuffle leading to a very expensive operation when dealing with billions and trillions of data.


# Output:
Partition 1 : 1 6 10 15 19
Partition 2 : 2 3 7 11 16
Partition 3 : 4 8 12 13 17
Partition 4 : 0 5 9 14 18

1.2 coalesce()

RDD coalesce() is used only to reduce the number of partitions. This is an optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.


# Using coalesce()
rdd3 = rdd1.coalesce(4)
print("Repartition size : "+str(rdd3.getNumPartitions()))
rdd3.saveAsTextFile("/tmp/coalesce")

If you compare the below output with section 1, you will notice partition 3 has been moved to 2 and Partition 6 has moved to 5, resulting in data movement from just 2 partitions.


Partition 1 : 0 1 2
Partition 2 : 3 4 5 6 7 8 9
Partition 4 : 10 11 12 
Partition 5 : 13 14 15 16 17 18 19

1.3 Complete Example of PySpark RDD repartition and coalesce

Below is a complete example of PySpark RDD repartition and coalesce in Python language.


# Complete Example
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com') \
        .master("local[5]").getOrCreate()

df = spark.range(0,20)
print(df.rdd.getNumPartitions())

spark.conf.set("spark.sql.shuffle.partitions", "500")

rdd = spark.sparkContext.parallelize(range(0,20))
print("From local[5]"+str(rdd.getNumPartitions()))

rdd1 = spark.sparkContext.parallelize(range(0,25), 6)
print("parallelize : "+str(rdd1.getNumPartitions()))

"""rddFromFile = spark.sparkContext.textFile("src/main/resources/test.txt",10)
print("TextFile : "+str(rddFromFile.getNumPartitions())) """

rdd1.saveAsTextFile("c://tmp/partition2")

rdd2 = rdd1.repartition(4)
print("Repartition size : "+str(rdd2.getNumPartitions()))
rdd2.saveAsTextFile("c://tmp/re-partition2")

rdd3 = rdd1.coalesce(4)
print("Repartition size : "+str(rdd3.getNumPartitions()))
rdd3.saveAsTextFile("c:/tmp/coalesce2")

2. PySpark DataFrame repartition() vs coalesce()

Like RDD, you can’t specify the partition/parallelism while creating DataFrame. DataFrame by default internally uses the methods specified in Section 1 to determine the default partition and splits the data for parallelism.

If you are not familiar with DataFrame, I will recommend learning it by following PySpark DataFrame Tutorial before proceeding further with this article.


# DataFrame example
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com') \
        .master("local[5]").getOrCreate()

df=spark.range(0,20)
print(df.rdd.getNumPartitions())

df.write.mode("overwrite").csv("c:/tmp/partition.csv")

The above example creates 5 partitions as specified in master("local[5]") and the data is distributed across all these 5 partitions.


# Output:
Partition 1 : 0 1 2 3
Partition 2 : 4 5 6 7
Partition 3 : 8 9 10 11
Partition 4 : 12 13 14 15
Partition 5 : 16 17 18 19

2.1 DataFrame repartition()

Similar to RDD, the PySpark DataFrame repartition() method is used to increase or decrease the partitions. The below example increases the partitions from 5 to 6 by moving data from all partitions.


# DataFrame repartition
df2 = df.repartition(6)
print(df2.rdd.getNumPartitions())

Just increasing 1 partition results in data movements from all partitions.


# Output:
Partition 1 : 14 1 5
Partition 2 : 4 16 15
Partition 3 : 8 3 18
Partition 4 : 12 2 19
Partition 5 : 6 17 7 0
Partition 6 : 9 10 11 13

And, even decreasing the partitions also results in moving data from all partitions. hence when you wanted to decrease the partition recommendation is to use coalesce()/

2.2 DataFrame coalesce()

Spark DataFrame coalesce() is used only to decrease the number of partitions. This is an optimized or improved version of repartition() where the movement of the data across the partitions is fewer using coalesce.


# DataFrame coalesce
df3 = df.coalesce(2)
print(df3.rdd.getNumPartitions())

This yields output 2 and the resultant partition looks like


# Output:
Partition 1 : 0 1 2 3 8 9 10 11
Partition 2 : 4 5 6 7 12 13 14 15 16 17 18 19

Since we are reducing 5 to 2 partitions, the data movement happens only from 3 partitions and it moves to remain 2 partitions.

3. Default Shuffle Partition

Calling groupBy(), union(), join() and similar functions on DataFrame results in shuffling data between multiple executors and even machines and finally repartitions data into 200 partitions by default. PySpark default defines shuffling partition to 200 using spark.sql.shuffle.partitions configuration.


# Default shuffle partition count
df4 = df.groupBy("id").count()
print(df4.rdd.getNumPartitions())

Post shuffle operations, you can change the partitions either using coalesce() or repartition().

Conclusion

In this PySpark repartition() vs coalesce() article, you have learned how to create an RDD with partition, repartition the RDD using coalesce(), repartition DataFrame using repartition() and coalesce() methods, and learned the difference between repartition and coalesce.

Related Articles

Reference

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply

This Post Has 3 Comments

  1. Gopi

    nice article.. covers all fundamentals for the topic

  2. KrishnaG

    Hi Team,

    The above code in python but it was mentioned as below “Below is a complete example of PySpark RDD repartition and coalesce in Scala language.” .please correct it so new learners wont get confused.

    1. NNK

      Thanks Krishna. I have fixed it now.