Spark Streaming files from a directory

This article describes and provides an example of how to continuously stream or read a JSON file source from a folder, process it and write the data to another source. Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset.

Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads. It is an extension of the core Spark API to process real-time data from sources like Kafka, Flume, and Amazon Kinesis to name few. This processed data can be pushed to databases, Kafka, live dashboards e.t.c

Json files we are going to use are located at GitHub. Download these files to your system as you would need in case if you want to run this program on your system.

Spark Streaming files from a folder

Streaming uses readStream on SparkSession to load a dataset from an external storage system.


val df = spark.readStream
   .schema("provide schema of json file")//Below codes provides example
      .json("c:/tmp/stream_folder")

Writing Spark Streaming to Console

Use writeStream.format("console") to write the streaming DataFrame to console.


df.writeStream
      .format("console")
      .outputMode("append")
      .start()             // Start the computation
      .awaitTermination()  // Wait for the computation to terminate

Since we are just reading a file (with out any aggregations) and writing as-is, we are using outputMode("append"). OutputMode is used to what data will be written to a sink when there is new data available in a DataFrame/Dataset

Now, let’s see how to apply a groupBy and count on streaming data.


val groupDF = df.select("Zipcode")
        .groupBy("Zipcode").count()
outputMode("complete") should use when writing streaming aggregation. Here, we are writing aggregated Dataset to console.

groupDF.writeStream
      .format("console")
      .outputMode("complete")
      .start()
      .awaitTermination()

How to run?

The complete example code can be found in the GitHub Download it and run SparkStreamingFromDirectory.scala from your favorite editor. When program execution pauses, copy/move the files to a folder. You should see the output on console and refreshes as you move the files to a folder.

Conclusion:

You have learned how to stream or read a JSON file from a directory using a Scala example. Spark Structured Streaming uses readStream to read and writeStream to write DataFrame/Dataset and also learned difference between complete and append outputMode.

Complete code for reference.


package com.sparkbyexamples.spark.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object SparkStreamingFromDirectory {

  def main(args: Array[String]): Unit = {

    val spark:SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExamples")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val schema = StructType(
      List(
        StructField("RecordNumber", IntegerType, true),
        StructField("Zipcode", StringType, true),
        StructField("ZipCodeType", StringType, true),
        StructField("City", StringType, true),
        StructField("State", StringType, true),
        StructField("LocationType", StringType, true),
        StructField("Lat", StringType, true),
        StructField("Long", StringType, true),
        StructField("Xaxis", StringType, true),
        StructField("Yaxis", StringType, true),
        StructField("Zaxis", StringType, true),
        StructField("WorldRegion", StringType, true),
        StructField("Country", StringType, true),
        StructField("LocationText", StringType, true),
        StructField("Location", StringType, true),
        StructField("Decommisioned", StringType, true)
      )
    )

    val df = spark.readStream
      .schema(schema)
      .json("c:/tmp/stream_folder")

    df.printSchema()

    val groupDF = df.select("Zipcode")
        .groupBy("Zipcode").count()
    groupDF.printSchema()

    groupDF.writeStream
      .format("console")
      .outputMode("complete")
      .start()
      .awaitTermination()
  }
}

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

Leave a Reply