This article describes and provides an example of how to continuously stream or read a JSON file source from a folder, process it and write the data to another source. Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset.
Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads. It is an extension of the core Spark API to process real-time data from sources like Kafka, Flume, and Amazon Kinesis to name few. This processed data can be pushed to databases, Kafka, live dashboards e.t.c
Json files we are going to use are located at GitHub. Download these files to your system as you would need in case if you want to run this program on your system.
Spark Streaming files from a folder
Streaming uses readStream
on SparkSession to load a dataset from an external storage system.
val df = spark.readStream
.schema("provide schema of json file")//Below codes provides example
.json("c:/tmp/stream_folder")
Writing Spark Streaming to Console
Use writeStream.format("console")
to write the streaming DataFrame to console.
df.writeStream
.format("console")
.outputMode("append")
.start() // Start the computation
.awaitTermination() // Wait for the computation to terminate
Since we are just reading a file (with out any aggregations) and writing as-is, we are using outputMode("append")
. OutputMode is used to what data will be written to a sink when there is new data available in a DataFrame/Dataset
Now, let’s see how to apply a groupBy
and count
on streaming data.
val groupDF = df.select("Zipcode")
.groupBy("Zipcode").count()
outputMode("complete")
should use when writing streaming aggregation. Here, we are writing aggregated Dataset to console.
groupDF.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
How to run?
The complete example code can be found in the GitHub Download it and run SparkStreamingFromDirectory.scala
from your favorite editor. When program execution pauses, copy/move the files to a folder. You should see the output on console and refreshes as you move the files to a folder.
Conclusion:
You have learned how to stream or read a JSON file from a directory using a Scala example. Spark Structured Streaming uses readStream to read and writeStream to write DataFrame/Dataset and also learned difference between complete and append outputMode.
Complete code for reference.
package com.sparkbyexamples.spark.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object SparkStreamingFromDirectory {
def main(args: Array[String]): Unit = {
val spark:SparkSession = SparkSession.builder()
.master("local[3]")
.appName("SparkByExamples")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val schema = StructType(
List(
StructField("RecordNumber", IntegerType, true),
StructField("Zipcode", StringType, true),
StructField("ZipCodeType", StringType, true),
StructField("City", StringType, true),
StructField("State", StringType, true),
StructField("LocationType", StringType, true),
StructField("Lat", StringType, true),
StructField("Long", StringType, true),
StructField("Xaxis", StringType, true),
StructField("Yaxis", StringType, true),
StructField("Zaxis", StringType, true),
StructField("WorldRegion", StringType, true),
StructField("Country", StringType, true),
StructField("LocationText", StringType, true),
StructField("Location", StringType, true),
StructField("Decommisioned", StringType, true)
)
)
val df = spark.readStream
.schema(schema)
.json("c:/tmp/stream_folder")
df.printSchema()
val groupDF = df.select("Zipcode")
.groupBy("Zipcode").count()
groupDF.printSchema()
groupDF.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
}
}
Can you please provide the same code in python. Any reference will also do
Can you please provide the same code in python. Any reference will also do
Do we require to set up kafka to work on the above example?
Hi Shanker, No you do not need Kafka to run this example. It just steams the files from the folder and outputs on the console.