Pyspark cache() method is used to cache the intermediate results of the transformation so that other transformation runs on top of cached will perform faster. Caching the result of the transformation is one of the optimization tricks to improve the performance of the long-running PySpark applications/jobs.
cache() is a lazy evaluation in PySpark meaning it will not cache the results until you call the action operation. In this article, I will explain what is cache, how it improves performance, and how to cache PySpark DataFrame results with examples.
1. Benefits of Caching
Caching a DataFrame that can be reused for multi-operations will significantly improve any PySpark job. Below are the benefits of cache().
- Cost-efficient – Spark computations are very expensive hence reusing the computations are used to save cost.
- Time-efficient – Reusing repeated computations saves lots of time.
- Execution time – Saves execution time of the job and we can perform more jobs on the same cluster.
First, why do we need to cache the result? consider a scenario where we perform multiple PySpark transformations in a lineage, we use the caching technique. Caching the intermediate results significantly improves the performance of future transformations that uses the results of previous transformations.
2. Why do we need Cache in PySpark?
First, let’s run some transformations without cache and understand what is the performance issue.
# Read csv with options
df = spark.read.options(header='True', inferSchema='True', delimiter=',') \
.csv("/apps/prabha/PythonTest/sample-zipcodes.csv")
df2 = df.where(col("State") =="PR")
count = df2.count()
df3 = df2.where(col("Zipcode") == 704)
count = df3.count()
What is the issue in the above statement?
Let’s assume you have billions of records in sample-zipcodes.csv
. Since action triggers the transformations, in the above example df2.count() is the first action hence it triggers the execution of reading a CSV file, and df.where().
We also have another action df3.count(), this again triggers execution of reading a file, df.where() and df2.where().
So in the above example, we are reading the file twice and df.where() twice. when you are detailing large number of records, this will become a performance issue and it can be easily avoided by caching the results of spark.read() and df2.where(). In the below section, I will explain how to use cache() and avoid this double execution.
3. PySpark cache()
Using the PySpark cache() method we can cache the results of transformations. Unlike persist(), cache() has no arguments to specify the storage levels because it stores in-memory only. Persist with storage-level as MEMORY-ONLY is equal to cache().
3.1 Syntax of cache()
Below is the syntax of cache() on DataFrame.
# Syntax
DataFrame.cache()
2.2 Using PySpark Cache
From the above example, let’s add cache() statement to spark.read() and df.where() transformations. When df2.count() executes, this triggers spark.read.csv(..).cache() which reads the file and caches the result in memory. and df.where(..).cache() also caches the result in memory.
When df3.count() executes, it just performs the df2.where() on top of cache results of df2, without re-executing previous transformations.
# Read csv with options
df = spark.read.options(header='True', inferSchema='True', delimiter=',') \
.csv("/apps/prabha/PythonTest/sample-zipcodes.csv").cache()
df2 = df.where(col("State") =="PR").cache()
count = df2.count()
df3 = df2.where(col("Zipcode") == 704)
count = df3.count()
In the above code, we are reading a CSV file into DataFrame df. Applying where transformation on df will result in df2 that contains only records where state=”PR” and caching this DataFrame. As discussed cache() will not perform the transformation as they are lazy in nature. When df2.count() executed then only the code where(col(“State”) ==”PR”).cache() will be evaluated and caches the result into df2..
By applying where transformation on df2 with Zipcode=704, since the df2 is already cached, the spark will look for the data that is cached and thus uses that DataFrame. Below is the output after performing a transformation on df2 which is read into df3, then applying action count().
3. PySpark RDD Cache
PySpark RDD also has the same benefits by cache similar to DataFrame.RDD is a basic building block that is immutable, fault-tolerant, and Lazy evaluated and that are available since Spark’s initial version.
3.1 RDD cache() Example
Below is an example of RDD cache(). After caching into memory it returns an RDD.
# RDD cache() usage
rdd = spark.sparkContext.textFile("/tmp/test.txt").cache()
rdd2 = rdd.flatMap(lambda x: x.split(" "))
rdd3 = rdd2.map(lambda x: (x,1))
rdd5 = rdd4.reduceByKey(lambda a,b: a+b)
rdd6 = rdd5.map(lambda x: (x[1],x[0])).sortByKey()
rdd6.collect()
4. Conclusion
PySpark cache() method is used to cache the intermediate results of the transformation into memory so that any future transformations on the results of cached transformation improve the performance. Caching is a lazy evaluation meaning it will not cache the results until you call the action operation and the result of the transformation is one of the optimization tricks to improve the performance of the long-running PySpark applications/jobs.
Related Articles
- PySpark partitionBy() Explained with Examples
- PySpark mapPartitions()
- PySpark repartition() vs partitionBy()
- Fonctions filter where en PySpark | Conditions Multiples
- PySpark Convert Dictionary/Map to Multiple Columns
- PySpark Find Maximum Row per Group in DataFrame
- PySpark Column Class | Operators & Functions