Spark Streaming files from a directory

This article describes and provides an example of how to continuously stream or read a JSON file source from a folder, process it and write the data to another source. Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset.

Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads. It is an extension of the core Spark API to process real-time data from sources like Kafka, Flume, and Amazon Kinesis to name few. This processed data can be pushed to databases, Kafka, live dashboards e.t.c

Json files we are going to use are located at GitHub. Download these files to your system as you would need in case if you want to run this program on your system.

Spark Streaming files from a folder

Streaming uses readStream on SparkSession to load a dataset from an external storage system.


val df = spark.readStream
   .schema("provide schema of json file")//Below codes provides example
      .json("c:/tmp/stream_folder")

Writing Spark Streaming to Console

Use writeStream.format("console") to write the streaming DataFrame to console.


df.writeStream
      .format("console")
      .outputMode("append")
      .start()             // Start the computation
      .awaitTermination()  // Wait for the computation to terminate

Since we are just reading a file (with out any aggregations) and writing as-is, we are using outputMode("append"). OutputMode is used to what data will be written to a sink when there is new data available in a DataFrame/Dataset

Now, let’s see how to apply a groupBy and count on streaming data.


val groupDF = df.select("Zipcode")
        .groupBy("Zipcode").count()
outputMode("complete") should use when writing streaming aggregation. Here, we are writing aggregated Dataset to console.

groupDF.writeStream
      .format("console")
      .outputMode("complete")
      .start()
      .awaitTermination()

How to run?

The complete example code can be found in the GitHub Download it and run SparkStreamingFromDirectory.scala from your favorite editor. When program execution pauses, copy/move the files to a folder. You should see the output on console and refreshes as you move the files to a folder.

Conclusion:

You have learned how to stream or read a JSON file from a directory using a Scala example. Spark Structured Streaming uses readStream to read and writeStream to write DataFrame/Dataset and also learned difference between complete and append outputMode.

Complete code for reference.


package com.sparkbyexamples.spark.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object SparkStreamingFromDirectory {

  def main(args: Array[String]): Unit = {

    val spark:SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExamples")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val schema = StructType(
      List(
        StructField("RecordNumber", IntegerType, true),
        StructField("Zipcode", StringType, true),
        StructField("ZipCodeType", StringType, true),
        StructField("City", StringType, true),
        StructField("State", StringType, true),
        StructField("LocationType", StringType, true),
        StructField("Lat", StringType, true),
        StructField("Long", StringType, true),
        StructField("Xaxis", StringType, true),
        StructField("Yaxis", StringType, true),
        StructField("Zaxis", StringType, true),
        StructField("WorldRegion", StringType, true),
        StructField("Country", StringType, true),
        StructField("LocationText", StringType, true),
        StructField("Location", StringType, true),
        StructField("Decommisioned", StringType, true)
      )
    )

    val df = spark.readStream
      .schema(schema)
      .json("c:/tmp/stream_folder")

    df.printSchema()

    val groupDF = df.select("Zipcode")
        .groupBy("Zipcode").count()
    groupDF.printSchema()

    groupDF.writeStream
      .format("console")
      .outputMode("complete")
      .start()
      .awaitTermination()
  }
}

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply

This Post Has 4 Comments

  1. Nasir

    Can you please provide the same code in python. Any reference will also do

  2. Anonymous

    Can you please provide the same code in python. Any reference will also do

  3. shanker

    Do we require to set up kafka to work on the above example?

    1. NNK

      Hi Shanker, No you do not need Kafka to run this example. It just steams the files from the folder and outputs on the console.