Spark RDD Cache and Persist with Example

Spark Cache and Persist are optimization techniques to improve the performance of the RDD jobs that are iterative and interactive. In this article, you will learn what is Cache and Persist, how to use it on RDD, understanding the difference between Caching and Persistence and how to use these two with RDD using Scala examples.

Spark provides computation 100 x times faster than traditional Map Reduce jobs, If you have not designed the jobs to reuse the repeating computations you will see degrade in performance when you are dealing with billions or trillions of data. Hence, we need to look at the computations and use optimization techniques as one of the ways to improve performance.

Using cache() and persist() methods, Spark provides an optimization mechanism to store the intermediate computation of an RDD so they can be reused in subsequent actions.

When you persist or cache an RDD, each worker node stores it’s partitioned data in memory or disk and reuses them in other actions on that RDD. And Spark’s persisted data on nodes are fault-tolerant meaning if any partition is lost, it will automatically be recomputed using the original transformations that created it.

Advantages for Caching and Persistence of RDD

Below are the advantages of using Spark RDD Cache and Persist methods.

Cost efficient – Spark computations are very expensive hence reusing the computations are used to save cost.

Time efficient – Reusing the repeated computations saves lots of time.

Execution time – Saves execution time of the job and we can perform more jobs on the same cluster.

RDD Cache Syntax and Example

Spark RDD cache() method by default saves RDD computation to storage level `MEMORY_ONLY` meaning it will store the data in the JVM heap as unserialized objects.

Syntax


 cache(): RDD.this.type

Spark cache() method in RDD class internally calls persist() method which in turn uses sparkSession.sharedState.cacheManager.cacheQuery to cache the result set of RDD. Let’s look at an example.

Example


  val spark:SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()
  val sc = spark.sparkContext

  val rdd = sc.textFile("src/main/resources/zipcodes-noheader.csv")

  val rdd2:RDD[ZipCode] = rdd.map(row=>{
    val strArray = row.split(",")
    ZipCode(strArray(0).toInt,strArray(1),strArray(3),strArray(4))
  })

  val rdd3 = rdd2.cache()

  println(rdd3.count()) 

RDD Persist Syntax and Example

Spark persist() method is used to store the RDD to one of the storage levels MEMORY_ONLY,MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2,MEMORY_AND_DISK_2 and more

Syntax


1) persist() : RDD.this.type
2) persist(newLevel : org.apache.spark.storage.StorageLevel) : RDD.this.type

Spark persist has two signature first signature doesn’t take any argument which by default saves it to MEMORY_ONLY storage level and the second signature which takes StorageLevel as an argument to store it to different storage levels.

Example


  val rddPersist = rdd.persist()
  dfPersist.show(false)

Using the second signature you can save RDD to any storage levels.


  val dfPersist = rdd.persist(StorageLevel.MEMORY_ONLY)
  dfPersist.show(false)

This stores RDD into Memory.

Note that RDD.cache() is an alias for persist(StorageLevel.MEMORY_ONLY) and it will store the data in the JVM heap as unserialized objects. When you write data to a disk, that data is always serialized. 

Unpersist syntax and Example

Spark automatically monitors every persist() and cache() calls you make and it checks usage on each node and drops persisted data if not used or by using least-recently-used (LRU) algorithm. You can also manually remove using unpersist() method. unpersist() marks the RDD as non-persistent, and remove all blocks for it from memory and disk.

Syntax


unpersist() : RDD.this.type
unpersist(blocking : scala.Boolean) : RDD.this.type

Example


  val rddPersist2 = rddPersist.unpersist()

unpersist(Boolean) with boolean as argument blocks until all blocks are deleted.

Spark RDD Persist storage levels

All different storage level Spark supports are available at org.apache.spark.storage.StorageLevel class. Storage level specifies how and where to store RDD. In Apache Spark, StorageLevel specifies whether RDD should be stored in a memory or should it be stored over the disk, or both. It also decides whether to serialize RDD.

MEMORY_ONLY – This is the default behavior of the RDD cache() method and stores the RDD as deserialized objects to JVM memory. When there is no enough memory available it will not save to RDD of some partitions and these will be re-computed as and when required. This takes more storage but runs faster as it takes few CPU cycles to read from memory.

MEMORY_ONLY_SER – This is the same as MEMORY_ONLY but the difference being it stores RDD as serialized objects to JVM memory. It takes lesser memory (space-efficient) then MEMORY_ONLY as it saves objects as serialized and takes an additional few more CPU cycles in order to deserialize.

MEMORY_ONLY_2 – Same as MEMORY_ONLY storage level but replicate each partition to two cluster nodes.

MEMORY_ONLY_SER_2 – Same as MEMORY_ONLY_SER storage level but replicate each partition to two cluster nodes.

MEMORY_AND_DISK – In this Storage Level, The RDD will be stored in JVM memory as a deserialized objects. When required storage is greater than available memory, it stores some of the excess partitions in to disk and reads the data from disk when it required. It is slower as there is I/O involved.

MEMORY_AND_DISK_SER – This is same as MEMORY_AND_DISK storage level difference being it serializes the RDD objects in memory and on disk when space not available.

MEMORY_AND_DISK_2 – Same as MEMORY_AND_DISK storage level but replicate each partition to two cluster nodes.

MEMORY_AND_DISK_SER_2 – Same as MEMORY_AND_DISK_SER storage level but replicate each partition to two cluster nodes.

DISK_ONLY – In this storage level, RDD is stored only on disk and the CPU computation time is high as I/O involved.

DISK_ONLY_2 – Same as DISK_ONLY storage level but replicate each partition to two cluster nodes.

Conclusion

In this article, you have learned Spark cache() and persist() methods are used as optimization techniques to save interim computation results of RDD and use them subsequently and learned what is the difference between Spark Cache and Persist and finally saw their syntaxes and usages with Scala examples.

Happy Learning !!

Reference

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

Leave a Reply