You are currently viewing PySpark persist() Explained with Examples

PySpark persist is a way of caching the intermediate results in specified storage levels so that any operations on persisted results improve performance in terms of memory usage and time. Since each action triggers all transformations performed on the lineage, if you have not designed the jobs to reuse the repeating computations, you will see performance degrade when you are dealing with billions or trillions of data.

Advertisements

Hence, we may need to look at the stages and use optimization techniques as one of the ways to improve performance.

Related: PySpark cache() with example

1. Introduction if PySpark Persist

Though PySpark provides computation 100 x times faster than traditional Map Reduce jobs, If you have not designed the jobs to reuse the repeating computations, you will see a degrade in performance when you are dealing with billions or trillions of data. Hence, we may need to look at the stages and use optimization techniques as one of the ways to improve performance.

Using persist() method, PySpark provides an optimization mechanism to store the intermediate computation of a PySpark DataFrame so they can be reused in subsequent actions.

When you persist a dataset, each node stores its partitioned data in memory and reuses them in other actions on that dataset. And PySpark persisted data on nodes are fault-tolerant meaning if any partition of a Dataset is lost, it will automatically be recomputed using the original transformations that created it.

2. Advantages of Persisting PySpark DataFrame

Persisting data in PySpark, or any Spark application, can offer several advantages:

  • Improved Performance: By persisting intermediate RDDs or DataFrames in memory or disk storage, you can avoid recomputation of those datasets in subsequent Spark actions. This can significantly improve performance when multiple actions are performed on the same dataset.
  • Reduced I/O Overhead: Persisting data in memory allows Spark to reuse the computed results without reading them from disk again. This reduces the I/O overhead associated with reading and writing data, resulting in faster execution times.
  • Optimized Data Sharing: When multiple Spark actions require the same dataset, persisting it in memory enables efficient data sharing among different stages of the Spark job. This reduces network traffic and speeds up computation.
  • Fault Tolerance: Spark automatically handles data persistence and fault tolerance. If an executor fails during computation, Spark can recover the lost data partitions from the persisted RDDs or DataFrames without recomputation, ensuring fault tolerance and data integrity.
  • Resource Management: Persisting data allows Spark to manage resources more effectively. By storing intermediate results in memory or disk, Spark can better utilize available resources, such as memory and CPU, leading to improved resource utilization and overall cluster performance.
  • Iterative Algorithms: Persisting data is crucial for iterative machine learning and graph processing algorithms, where the same dataset is repeatedly used with slight modifications. You can avoid recomputation in each iteration by persisting intermediate results, leading to significant performance gains.

3. Usage of persist() on DataFrame.

PySpark persist() method is used to store the DataFrame to one of the storage levels MEMORY_ONLY,MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2,MEMORY_AND_DISK_2 and more.

Caching or persisting of PySpark DataFrame is a lazy operation, meaning a DataFrame will not be cached until you trigger an action. 

Syntax


# persist() Syntax
DataFrame.persist(storageLevel: pyspark.storagelevel.StorageLevel = StorageLevel(True, True, False, True, 1)) 

PySpark persist has two signature first signature doesn’t take any argument which by default saves it to MEMORY_AND_DISK storage level and the second signature which takes StorageLevel as an argument to store it to different storage levels.

Example


# Persist the DataFrame
dfPersist = df.persist()
dfPersist.show(false)

Using the second signature you can save DataFrame to any storage level.


# Persist the DataFrame
dfPersist = df.persist(StorageLevel.MEMORY_ONLY)
dfPersist.show(false)

This stores DataFrame in Memory.

Note that PySpark cache() is an alias for persist(StorageLevel.MEMORY_AND_DISK) 

4. Unpersist syntax and Example

PySpark automatically monitors every persist() call you make and it checks usage on each node and drops persisted data if not used or by using the least-recently-used (LRU) algorithm. You can also manually remove using unpersist() method. unpersist() marks the DataFrame as non-persistent, and removes all blocks for it from memory and disk.

Syntax


# unpresist() Syntax
DataFrame.unpersist(blocking: bool = False) → pyspark.sql.dataframe.DataFrame

Example


# unpersist the DataFrame
dfPersist = dfPersist.unpersist()

unpersist(Boolean) with boolean as argument blocks until all blocks are deleted.

5. PySpark Persist storage levels

All different storage level PySpark supports are available at org.apache.spark.storage.StorageLevel class. The storage level specifies how and where to persist or cache a PySpark DataFrame.

MEMORY_ONLY – This is the default behavior of the RDD cache() method and stores the RDD or DataFrame as deserialized objects to JVM memory. When there is not enough memory available it will not save DataFrame of some partitions and these will be re-computed as and when required. This takes more memory. but unlike RDD, this would be slower than MEMORY_AND_DISK level as it recomputes the unsaved partitions, and recomputing the in-memory columnar representation of the underlying table is expensive

MEMORY_ONLY_SER – This is the same as MEMORY_ONLY but the difference being it stores RDD as serialized objects to JVM memory. It takes lesser memory (space-efficient) than MEMORY_ONLY as it saves objects as serialized and takes an additional few more CPU cycles in order to deserialize.

MEMORY_ONLY_2 – Same as MEMORY_ONLY storage level but replicate each partition to two cluster nodes.

MEMORY_ONLY_SER_2 – Same as MEMORY_ONLY_SER storage level but replicate each partition to two cluster nodes.

MEMORY_AND_DISK – This is the default behavior of the DataFrame. In this Storage Level, The DataFrame will be stored in JVM memory as a deserialized object. When required storage is greater than available memory, it stores some of the excess partitions into a disk and reads the data from the disk when required. It is slower as there is I/O involved.

MEMORY_AND_DISK_SER – This is the same as MEMORY_AND_DISK storage level difference being it serializes the DataFrame objects in memory and on disk when space is not available.

MEMORY_AND_DISK_2 – Same as MEMORY_AND_DISK storage level but replicate each partition to two cluster nodes.

MEMORY_AND_DISK_SER_2 – Same as MEMORY_AND_DISK_SER storage level but replicate each partition to two cluster nodes.

DISK_ONLY – In this storage level, DataFrame is stored only on disk and the CPU computation time is high as I/O is involved.

DISK_ONLY_2 – Same as DISK_ONLY storage level but replicate each partition to two cluster nodes.

Conclusion

In this article, you have learned PySpark persist() method is used as an optimization technique to save interim computation results of DataFrame and reuse them subsequently. Persisting data in PySpark can enhance performance, reduce computation overhead, improve fault tolerance, and facilitate efficient resource management, making it a valuable technique for optimizing Spark applications.

Related Articles