What is Spark Streaming Checkpoint?

What is the Spark or PySpark Streaming Checkpoint? As the Spark streaming application must operate 24/7, it should be fault-tolerant to the failures unrelated to the application logic (e.g., system failures, JVM crashes, etc.) and data loss recovery should be quick and performative. To achieve this Spark streaming application needs to checkpoint enough information to any fault-tolerant storage systems (e.g., HDFS, Azure Data Lake, Amazon S3, etc.) to recover from failures

In this article, let us discuss the importance of fault tolerance. what is the checkpoint directory? Types of checkpoints. When to enable checkpointing and How to enable it.

1. Importance of Fault Tolerance

In Spark streaming we have streaming data coming 24/7 in the system, we check the data from a period of time and process these as events like some kind of computation or aggregations on top of these events. Now, if our application fails due to some error, then to recover we conceptually need to re-process all the events that are already processed in the time which is a waste of resources, time-consuming and it is unnecessary.

To overcome this, we have to store the last message metadata we processed before failure so that it would be easy to restart from the failure point. By default, the Spark streaming application doesn’t store the metadata information and it is user’s responsibility to maintain it. Maintaining the meta info of the last message processed is kind of a checkpoint, and it enables us to create a fault-tolerant streaming application. This checkpoint or fault-tolerant helps us with the following.

  • Limits the state that needs to be re-processed after failure.
  • Enables application for recovery from failures.
  • Helps to store data and Metadata in a fault-tolerant file system.

2. What is Checkpoint Directory

Checkpoint is a mechanism where every so often Spark streaming application stores data and metadata in the fault-tolerant file system. So Checkpoint stores the Spark application lineage graph as metadata and saves the application state in a timely to a file system. The checkpoint mainly stores two things.

  • Data Checkpointing
  • Metadata Checkpointing
Spark Streaming Checkpoint
Spark Streaming Checkpoint Directory

2.1. Data Checkpoint

Data Checkpoint stores the generated RDD in a file system to complete some stateful transformations. This is used when any computation or aggregations on the current micro-batch are dependent on the upcoming micro-batch and vice-versa. As in such cases, as the linear dependency across micro-batches increases, the Spark streaming checkpoint directory periodically checkpoints the intermediate data of stateful transformations to reliable storage and reduces the recovery time. As a result, it cut downs the dependency chain.

2.2. Metadata Checkpointing

Metadata Checkpoint stores the information related to the streaming computation, configuration, and queues in fault-tolerant storage. It is used to recover from failures unrelated to the application logic. Metadata mainly stored below

  • Incomplete Batches: Spark Streaming application chucks the incoming events and groups for a period of time and processes them as batches. This stores information about the batches that are not processed/ Queued.
  • Configuration: It stores the configuration that was set up for the Spark streaming application like maxFilesPerTrigger, Processing time interval, e.t.c…
  • DStream Operations: It stores all information about the operations that are performed on the continuous streaming data.

Spark streaming application uses this information to recover from failures and re-start from the failure position instead of starting from the beginning. Metadata checkpointing is primarily needed for recovery from driver failures, whereas data checkpointing is necessary even for basic functioning if stateful transformations are used.

To set the Spark checkpoint directory, We can pass the checkpoint location as an option to writeStream of a streaming dataFrame.


dataFrame
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "checkpoint")
  .format("console")
  .start()
  .awaitTermination()

The parameter "checkpointLocation” enables the checkpoint and specifies the location where we store checkpoint information in the File system.

3. Types of checkpointing

There are two types of checkpointing in Spark streaming

  1. Reliable checkpointing: The Checkpointing that stores the actual RDD in a reliable distributed file system like HDFS, ADLS, Amazon S3, e.t.c.
  2. Local checkpointing: In this checkpoint, the actual RDD is stored in local storage in the executor.

4. When to Enable Checkpoint?

In Spark streaming applications, checkpointing is must and helpfull with any of the following requirement

  • Using Statefull Transformations: When either updateStateByKey and many Window transformations like countByWindow, countByValueandWindow, incremental reduceByWindow, incremental reduceByKeyandWindow are used in your application, then checkpointing is must with periodic RDD checkpointing.
  • To recover from application failures, metada checkpoint would be usefull.

5. How to Enable Checkpoint?

It is not difficult to enable checkpointing in Spark streaming context, we call the checkpoint method and pass a directory in a fault-tolerant, reliable file system (e.g., HDFS, S3, etc.) to which the checkpoint information will be persisted and then start the application to get the computations that you have. checkpointing is a period concept, it goes on happening every so often and determining its period is a key decission and its based on the rate of events generated by your service like kafka or Azure Event Hub.

Setting up a checkpoint directory


// new context
val ssc = new StreamingContext(...) 

//....

//set checkpoint directory
ssc.checkpoint(checkpointDirectory)
// Start the context
ssc.start()
ssc.awaitTermination()

Here in the above example, first we are creating Spark context and on top of it we are calling checkpoint method which takes the checkpoint directory as a parameter and later we simply start the application context.

During the initial run, if the checkpoint directory is not available then the Spark streaming context will create it in the specified path. In case of failure recovery process, then using the existing checkpoint directory the Spark streaming context will be recreated from the checkpoint data.

6. Conclusion

In Spark streaming application, checkpoint helps to develop fault-tolerant and resilient Spark applications. It maintains intermediate state on fault-tolerant compatible file systems like HDFS, ADLS and S3 storage systems to recover from failures. To specify the checkpoint in a streaming query, we use the checkpointLocation as parameter.

Note: In next article, we will see more information on what is the folder structure of checkpoint directory and how we setup and use the checkpoint concept using databricks.

Related Articles

Sriram

Data Engineer. I write about BigData Architecture, tools and techniques that are used to build Bigdata pipelines and other generic blogs.

Leave a Reply