You are currently viewing Spark Streaming – Reading data from TCP Socket

Using Spark streaming we will see a working example of how to read data from TCP Socket, process it and write output to console. Spark uses readStream() to read and writeStream() to write streaming DataFrame or Dataset. The below-explained example does the word count on streaming data and outputs the result to console.

Advertisements
What is Spark Streaming

Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads. It is an extension of the core Spark API to process real-time data from sources like TCP socket, Kafka, Flume, and Amazon Kinesis to name it few. This processed data can be pushed to databases, Kafka, live dashboards e.t.c

spark streaming TCP socket
Spark Streaming TCP socket

Spark Streaming data from TCP Socket

Use readStream.format("socket") from Spark session object to read data from the socket and provide options host and port where you want to stream data from.


val df = spark.readStream
      .format("socket")
      .option("host","localhost")
      .option("port","9090")
      .load()

Spark reads the data from socket and represents it in a “value” column of DataFrame. df.printSchema() outputs


root
 |-- value: string (nullable = true)

Process the data using DataFrame operations

Now, let’s process the data by counting the word; first split the data, use the explode to flatten it and apply groupBy function.


val wordsDF = df.select(explode(split(df("value")," ")).alias("word"))

val count = wordsDF.groupBy("word").count()

Spark Streaming to console

Use writeStream.format("console") to write data to console and outputMode(“complete”) should use when writing streaming aggregation DataFrames.


val query = count.writeStream
      .format("console")
      .outputMode("complete")
      .start()
      .awaitTermination()

Source code of Spark Streaming TCP Socket example


package com.sparkbyexamples.spark.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{explode, split}
object SparkStreamingFromSocket {
  def main(args: Array[String]): Unit = {

    val spark:SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExample")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val df = spark.readStream
      .format("socket")
      .option("host","localhost")
      .option("port","9090")
      .load()

    val wordsDF = df.select(explode(split(df("value")," ")).alias("word"))
    val count = wordsDF.groupBy("word").count()
    val query = count.writeStream
      .format("console")
      .outputMode("complete")
      .start()
      .awaitTermination()
  }
}

Let’s see how to run this example.

1. Install NetCat

First, let’s write some data to Socket, using NetCat, use this utility we can write data to TCP socket, it is the best utility to write to the socket. after install run below command.

nc -l -p 9090 

2. Run Spark Streaming job.

The complete example code can also be found at GitHub. Download it and run SparkStreamingFromSocket.scala from your favorite editor. When program execution pauses, switch to NetCat console and type a few sentences and press enter for each line as shown below.

streaming tcp socket

Yields below output on your editor.


-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
|   for|    1|
|Oracle|    2|
|  Name|    1|
|    is|    2|
|Naveen|    1|
|  work|    1|
|    My|    1|
|     I|    1|
|  good|    1|
+------+-----+

Conclusion

You have learned how to stream or read a data from TCP Socket using Spark Structured Streaming with Scala example and also learned how to use NetCat to write data to TCP socket.

This Post Has One Comment

  1. Anonymous

    cool demo!

Comments are closed.