What is the Spark or PySpark Streaming Checkpoint? As the Spark streaming application must operate 24/7, it should be fault-tolerant to the failures unrelated to the application logic (e.g., system failures, JVM crashes, etc.) and data loss recovery should be quick and performative. To achieve this Spark streaming application needs to checkpoint enough information to any fault-tolerant storage systems (e.g., HDFS, Azure Data Lake, Amazon S3, etc.) to recover from failures
In this article, let us discuss the importance of fault tolerance. what is the checkpoint directory? Types of checkpoints. When to enable checkpointing and How to enable it.
Table of contents
1. Importance of Fault Tolerance
In Spark streaming we have streaming data coming 24/7 in the system, we check the data from a period of time and process these as events like some kind of computation or aggregations on top of these events. Now, if our application fails due to some error, then to recover we conceptually need to re-process all the events that are already processed in the time which is a waste of resources, time-consuming and it is unnecessary.
To overcome this, we have to store the last message metadata we processed before failure so that it would be easy to restart from the failure point. By default, the Spark streaming application doesn’t store the metadata information and it is user’s responsibility to maintain it. Maintaining the meta info of the last message processed is kind of a checkpoint, and it enables us to create a fault-tolerant streaming application. This checkpoint or fault-tolerant helps us with the following.
- Limits the state that needs to be re-processed after failure.
- Enables application for recovery from failures.
- Helps to store data and Metadata in a fault-tolerant file system.
2. What is Checkpoint Directory
Checkpoint is a mechanism where every so often Spark streaming application stores data and metadata in the fault-tolerant file system. So Checkpoint stores the Spark application lineage graph as metadata and saves the application state in a timely to a file system. The checkpoint mainly stores two things.
- Data Checkpointing
- Metadata Checkpointing
2.1. Data Checkpoint
Data Checkpoint stores the generated RDD in a file system to complete some stateful transformations. This is used when any computation or aggregations on the current micro-batch are dependent on the upcoming micro-batch and vice-versa. As in such cases, as the linear dependency across micro-batches increases, the Spark streaming checkpoint directory periodically checkpoints the intermediate data of stateful transformations to reliable storage and reduces the recovery time. As a result, it cut downs the dependency chain.
2.2. Metadata Checkpointing
Metadata Checkpoint stores the information related to the streaming computation, configuration, and queues in fault-tolerant storage. It is used to recover from failures unrelated to the application logic. Metadata mainly stored below
- Incomplete Batches: Spark Streaming application chucks the incoming events and groups for a period of time and processes them as batches. This stores information about the batches that are not processed/ Queued.
- Configuration: It stores the configuration that was set up for the Spark streaming application like maxFilesPerTrigger, Processing time interval, e.t.c…
- DStream Operations: It stores all information about the operations that are performed on the continuous streaming data.
Spark streaming application uses this information to recover from failures and re-start from the failure position instead of starting from the beginning. Metadata checkpointing is primarily needed for recovery from driver failures, whereas data checkpointing is necessary even for basic functioning if stateful transformations are used.
To set the Spark checkpoint directory, We can pass the checkpoint location as an option to writeStream of a streaming dataFrame.
dataFrame
.writeStream
.outputMode("complete")
.option("checkpointLocation", "checkpoint")
.format("console")
.start()
.awaitTermination()
The parameter "checkpointLocation”
enables the checkpoint and specifies the location where we store checkpoint information in the File system.
3. Types of checkpointing
There are two types of checkpointing in Spark streaming
- Reliable checkpointing: The Checkpointing that stores the actual RDD in a reliable distributed file system like HDFS, ADLS, Amazon S3, e.t.c.
- Local checkpointing: In this checkpoint, the actual RDD is stored in local storage in the executor.
4. When to Enable Checkpoint?
In Spark streaming applications, checkpointing is must and helpfull with any of the following requirement
- Using Statefull Transformations: When either
updateStateByKey
and many Window transformations likecountByWindow
,countByValueandWindow
, incrementalreduceByWindow
, incrementalreduceByKeyandWindow
are used in your application, then checkpointing is must with periodic RDD checkpointing. - To recover from application failures, metada checkpoint would be usefull.
5. How to Enable Checkpoint?
It is not difficult to enable checkpointing in Spark streaming context, we call the checkpoint method and pass a directory in a fault-tolerant, reliable file system (e.g., HDFS, S3, etc.) to which the checkpoint information will be persisted and then start the application to get the computations that you have. checkpointing is a period concept, it goes on happening every so often and determining its period is a key decission and its based on the rate of events generated by your service like kafka or Azure Event Hub.
Setting up a checkpoint directory
// new context
val ssc = new StreamingContext(...)
//....
//set checkpoint directory
ssc.checkpoint(checkpointDirectory)
// Start the context
ssc.start()
ssc.awaitTermination()
Here in the above example, first we are creating Spark context and on top of it we are calling checkpoint method which takes the checkpoint directory as a parameter and later we simply start the application context.
During the initial run, if the checkpoint directory is not available then the Spark streaming context will create it in the specified path. In case of failure recovery process, then using the existing checkpoint directory the Spark streaming context will be recreated from the checkpoint data.
6. Conclusion
In Spark streaming application, checkpoint helps to develop fault-tolerant and resilient Spark applications. It maintains intermediate state on fault-tolerant compatible file systems like HDFS, ADLS and S3 storage systems to recover from failures. To specify the checkpoint in a streaming query, we use the checkpointLocation
as parameter.
Note: In next article, we will see more information on what is the folder structure of checkpoint directory and how we setup and use the checkpoint concept using databricks.
Related Articles
- Spark Streaming – Different Output modes explained
- Spark from_avro() and to_avro() usage
- Spark Streaming – Reading data from TCP Socket
- Spark Streaming files from a directory
- Apache Spark Interview Questions
- What is Apache Spark and Why It Is Ultimate for Working with Big Data
- Spark – Overwrite the output directory