Spark DataFrame Cache and Persist Explained

Spark Cache and Persist are optimization techniques in DataFrame / Dataset for iterative and interactive Spark applications to improve the performance of Jobs. In this article, you will learn What is Spark cache() and persist(), how to use it in DataFrame, understanding the difference between Caching and Persistance and how to use these two with DataFrame, and Dataset using Scala examples.

Though Spark provides computation 100 x times faster than traditional Map Reduce jobs, If you have not designed the jobs to reuse the repeating computations you will see a degrade in performance when you are dealing with billions or trillions of data. Hence, we may need to look at the stages and use optimization techniques as one of the ways to improve performance.

Using cache() and persist() methods, Spark provides an optimization mechanism to store the intermediate computation of a Spark DataFrame so they can be reused in subsequent actions.

When you persist a dataset, each node stores its partitioned data in memory and reuses them in other actions on that dataset. And Spark’s persisted data on nodes are fault-tolerant meaning if any partition of a Dataset is lost, it will automatically be recomputed using the original transformations that created it.

Advantages for Caching and Persistence of DataFrame

Below are the advantages of using Spark Cache and Persist methods.

  • Cost-efficient – Spark computations are very expensive hence reusing the computations are used to save cost.
  • Time-efficient – Reusing repeated computations saves lots of time.
  • Execution time – Saves execution time of the job and we can perform more jobs on the same cluster.

Spark Cache Syntax and Example

Spark DataFrame or Dataset cache() method by default saves it to storage level `MEMORY_AND_DISK` because recomputing the in-memory columnar representation of the underlying table is expensive. Note that this is different from the default cache level of `RDD.cache()` which is ‘MEMORY_ONLY‘.

Syntax


 cache() : Dataset.this.type

Spark cache() method in Dataset class internally calls persist() method which in turn uses sparkSession.sharedState.cacheManager.cacheQuery to cache the result set of DataFrame or Dataset. Let’s look at an example.

Example


  val spark:SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  //read csv with options
  val df = spark.read.options(Map("inferSchema"->"true","delimiter"->",","header"->"true"))
    .csv("src/main/resources/zipcodes.csv")
  
  val df2 = df.where(col("State") === "PR").cache()
  df2.show(false)

  println(df2.count())

  val df3 = df2.where(col("Zipcode") === 704)

  println(df2.count())

DataFrame Persist Syntax and Example

Spark persist() method is used to store the DataFrame or Dataset to one of the storage levels MEMORY_ONLY,MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2,MEMORY_AND_DISK_2 and more.

Caching or persisting of Spark DataFrame or Dataset is a lazy operation, meaning a DataFrame will not be cached until you trigger an action. 

Syntax


1) persist() : Dataset.this.type
2) persist(newLevel : org.apache.spark.storage.StorageLevel) : Dataset.this.type

Spark persist has two signature first signature doesn’t take any argument which by default saves it to MEMORY_AND_DISK storage level and the second signature which takes StorageLevel as an argument to store it to different storage levels.

Example


  val dfPersist = df.persist()
  dfPersist.show(false)

Using the second signature you can save DataFrame/Dataset to any storage levels.


  val dfPersist = df.persist(StorageLevel.MEMORY_ONLY)
  dfPersist.show(false)

This stores DataFrame/Dataset into Memory.

Note that Dataset cache() is an alias for persist(StorageLevel.MEMORY_AND_DISK) 

Unpersist syntax and Example

Spark automatically monitors every persist() and cache() calls you make and it checks usage on each node and drops persisted data if not used or by using the least-recently-used (LRU) algorithm. You can also manually remove using unpersist() method. unpersist() marks the Dataset as non-persistent, and remove all blocks for it from memory and disk.

Syntax


unpersist() : Dataset.this.type
unpersist(blocking : scala.Boolean) : Dataset.this.type

Example


  val dfPersist = dfPersist.unpersist()

unpersist(Boolean) with boolean as argument blocks until all blocks are deleted.

Spark Persist storage levels

All different storage level Spark supports are available at org.apache.spark.storage.StorageLevel class. The storage level specifies how and where to persist or cache a Spark DataFrame and Dataset.

MEMORY_ONLY – This is the default behavior of the RDD cache() method and stores the RDD or DataFrame as deserialized objects to JVM memory. When there is not enough memory available it will not save DataFrame of some partitions and these will be re-computed as and when required. This takes more memory. but unlike RDD, this would be slower than MEMORY_AND_DISK level as it recomputes the unsaved partitions, and recomputing the in-memory columnar representation of the underlying table is expensive

MEMORY_ONLY_SER – This is the same as MEMORY_ONLY but the difference being it stores RDD as serialized objects to JVM memory. It takes lesser memory (space-efficient) than MEMORY_ONLY as it saves objects as serialized and takes an additional few more CPU cycles in order to deserialize.

MEMORY_ONLY_2 – Same as MEMORY_ONLY storage level but replicate each partition to two cluster nodes.

MEMORY_ONLY_SER_2 – Same as MEMORY_ONLY_SER storage level but replicate each partition to two cluster nodes.

MEMORY_AND_DISK – This is the default behavior of the DataFrame or Dataset. In this Storage Level, The DataFrame will be stored in JVM memory as a deserialized object. When required storage is greater than available memory, it stores some of the excess partitions into a disk and reads the data from the disk when required. It is slower as there is I/O involved.

MEMORY_AND_DISK_SER – This is the same as MEMORY_AND_DISK storage level difference being it serializes the DataFrame objects in memory and on disk when space is not available.

MEMORY_AND_DISK_2 – Same as MEMORY_AND_DISK storage level but replicate each partition to two cluster nodes.

MEMORY_AND_DISK_SER_2 – Same as MEMORY_AND_DISK_SER storage level but replicate each partition to two cluster nodes.

DISK_ONLY – In this storage level, DataFrame is stored only on disk and the CPU computation time is high as I/O is involved.

DISK_ONLY_2 – Same as DISK_ONLY storage level but replicate each partition to two cluster nodes.

Conclusion

In this article, you have learned Spark cache() and persist() methods are used as optimization techniques to save interim computation results of DataFrame or Dataset and reuse them subsequently and learned what is the difference between Spark Cache and Persist and finally saw their syntaxes and usages with Scala examples.

Happy Learning !!

Reference

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply