While Spark is commonly associated with processing large batches of data through massive daily jobs, the reality is that data often enters our systems continuously. To process this data in batch mode, aggregation and processing delays are usually required. However, this can be inefficient — Spark Streaming comes to the rescue.
Spark Streaming, while not a pure streaming solution like Flink, breaks real-time data streams into small batches that Spark’s processing engine can handle in parallel. As an additional benefit, Spark Streaming can read from and write to many different streaming data sources — one of the most popular ones is Kafka.
This article won’t focus extensively on Spark Streaming. Instead, the emphasis will be on testing existing Streaming applications locally. This eliminates the need for a connection to an existing Kafka cluster or setting up one in Docker. For this purpose, we’ll utilize the EmbeddedKafka library, enabling quick and lightweight unit tests.
1. Setting up dependencies
The starting point is to include Spark and SparkStreaming dependencies in the‘build.sbt’
file:
libraryDependencies += "org.apache.spark" % "spark-core_2.12" % "3.3.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "3.3.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.3.0"
And to test our code, as I’ve already mentioned, we’ll be using Scalatest and EmbeddedKafka:
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.15" % Test
libraryDependencies += "io.github.embeddedkafka" %% "embedded-kafka" % "3.4.0" %
Test
2. Utility class for tests
Before diving into tests, setting up a foundation is important. A trait can be created to set up and tear down required infrastructure for tests. This avoids repetition and ensures a
consistent setup:
trait SparkStreamingTester
extends Suite
with BeforeAndAfterEach {
val sparkSession: SparkSession = {
SparkSession
.builder()
.master("local")
.appName("test spark app")
.config("spark.ui.enabled", "false")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
}
val checkpointDir: File = Files.createTempDirectory("stream_checkpoint").toFile FileUtils.forceDeleteOnExit(checkpointDir)
sparkSession.sparkContext.setCheckpointDir(checkpointDir.toString)
implicit val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
kafkaPort = 9092,
zooKeeperPort = 2181,
customBrokerProperties = Map("auto.create.topics.enable" -> "true")
)
override def beforeEach(): Unit = {
EmbeddedKafka.start()(kafkaConfig)
}
override def afterEach(): Unit = {
EmbeddedKafka.stop()
}
}
Breaking down the utility class:
- The trait extends two tags from Scalatest, providing ‘beforeEach’ and ‘afterEach’ methods to set up and tear down EmbeddedKafka before and after each test;
- An implicit ‘kafkaConfig’ is required and set as an implicit parameter to grant access to Kafka-related methods;
- Spark session setup is straightforward, creating a Spark application for testing;
- The ‘checkpointDir’ is necessary for streaming applications to store checkpoint data;
Spark application can be stopped, redeployed, re-started at any point of its life, while stream continues to exist, and still has some messages in it. Kafka doesn’t store information about the state of its consumers – so Spark will have to do that itself. Hence we need checkpointing – in simple words, it’s just data about what topic we’ve read from, and what have we already read, so when reconnected to Kafka, Spark application can pick up where it left.
That’s all the preparation that we need — now we can start with the creation of an actual test.
3. Testing a simple Spark Streaming case
Let’s create a test that is as simple as possible — send one string to Kafka, fetch it using Spark Dataset, collect from Spark, and make sure that we’ve received what we actually expected.
Here is the code through which we’ll go line by line:
class SimpleStreamingTest
extends AnyFlatSpec
with SparkStreamingTester {
"spark" should "receive message from Kafka" in {
val topic = "test_topic"
val tableName = "test_table"
val testMessage = "test_message"
val df = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", s"localhost:${kafkaConfig.kafkaPort}")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
val query = df.writeStream
.format("memory")
.queryName(tableName)
.outputMode(OutputMode.Append())
.trigger(Trigger.Once())
.start()
EmbeddedKafka.publishStringMessageToKafka(topic, testMessage) query.processAllAvailable()
val results = sparkSession.sql(f"SELECT value FROM $tableName").collect()
results.length should be(1)
val messageBytes = results.head.getAs[Array[Byte]]("value")
new String(messageBytes, StandardCharsets.UTF_8) should be(testMessage)
}
}
Our test class is extending the “utility class” we’ve created in a previous section, so when the actual test begins, we already have the Spark session ready, and Kafka started. After follow the nest steps:
4. Create (read) a stream that comes from Kafka
To do so, we need to specify the URL of the Kafka server we want to connect to, and the topic we’ll be reading from. Note here ‘startingOffsets’ parameter — it specifies where we would start reading the data.‘earliest’ means we’ll try reading the data from the very beginning of the stream. So if you’re connecting a production job to an existing topic on a long-running kafka server — you probably don’t want to set that (maybe latest work is better for you). For the test it’s absolutely fine though, it means we definitely won’t miss the message. We’ve now created our source.
5. Creating a ‘df’
As a result of the previous step, you got ‘df’ — a totally usual Spark DataFrame, same as if you’re reading data in a batch from Parquet files: you can apply (mostly) the same transformations and functions, and the fact that data is being streamed is abstracted away from you.
6. Creating a sink — destination for our system
writeStream does exactly that. We’ll just be storing the data into an in-memory table — definitely not a production solution, but a good way to test our stream processing. outputMode just means we’ll be appending new data (if it comes) to the table we’ve specified. trigger is a moment when data would be actually read and processed — probably you’ll set it to Continuous for real job, but we only need to process data once in test. And that’s kinda it! Almost.
7. Publish the message with the use of EmbeddedKafka.publishStringMessageToKafka
After we need to wait till all the data is processed — by calling query.processAllAvailable(). As docstring says, this method can block for a long period of time, and only exists for testing purposes.
8. Collect the results and check what we’ve received from Kafka
Resulting table doesn’t have just the value we’re interested in, but also a key associated with it, the topic where it was read from and other metadata that we’ll ignore in our case. Also note that value is an Array[Byte] — so we can serialize any kind of object there (and we’ll actually do that, but slightly later).
9. Decode the String (Kafka client uses UTF-8 charset by default), and compare it to what we’ve sent
This is the final step — voila! It’s exactly what we expected it to be — test_message that made a full circle.
Related Articles
- Spark Extract Values from a Row Object
- Spark with SQL Server – Read and Write Table
- reduceByKey vs groupByKey vs aggregateByKey vs combineByKey in Spark
- Reduce Key-Value Pair into Key-list Pair
- Spark min() & max() with Examples
- Spark – Extract DataFrame Column as List
- Spark Timestamp – Extract hour, minute and second
- Find Median and Quantiles using Spark
- Filter Spark DataFrame using Values from a List
- Spark map() and mapValues()
- Spark Kryoserializer buffer max