Spark Streaming with Kafka Example

Spark Streaming From Kafka Example
Spark Streaming with Kafka Example

Using Spark Streaming we can read from Kafka topic and write to Kafka topic in TEXT, CSV, AVRO and JSON formats, In this article, we will learn with scala example of how to stream from Kafka messages in JSON format using from_json() and to_json() SQL functions.

What is Spark Streaming?

Apache Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads. It is an extension of the core Spark API to process real-time data from sources like Kafka, Flume, and Amazon Kinesis to name a few. This processed data can be pushed to other systems like databases, Kafka, live dashboards e.t.c

What is Apache Kafka

Apache Kafka is a publish-subscribe messaging system originally written at LinkedIn.
A Kafka cluster is a highly scalable and fault-tolerant system and it also has a much higher throughput compared to other message brokers such as ActiveMQ and RabbitMQ

Prerequisites

If you don’t have Kafka cluster setup, follow the below articles to set up the single broker cluster and get familiar with creating and describing topics.

Table of contents

1. Run Kafka Producer Shell

First, let’s produce some JSON data to Kafka topic "json_topic", Kafka distribution comes with Kafka Producer shell, run this producer and input the JSON data from person.json. Just copy one line at a time from person.json file and paste it on the console where Kafka Producer shell is running.

Note: By default when you write a message to a topic, Kafka automatically creates a topic however, you can also create a topic manually and specify your partition and replication factor.


bin/kafka-console-producer.sh \
--broker-list localhost:9092 --topic json_topic
Spark Streaming Kafka Example

2. Streaming With Kafka

2.1. Kafka Maven dependency

In order to streaming data from Kafka topic, we need to use below Kafka client Maven dependencies. You use the version according to yo your Kafka and Scala versions

<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
     <version>2.4.0</version>
</dependency>

2.2 Spark Streaming Scala example

Spark Streaming uses readStream() on SparkSession to load a streaming Dataset from Kafka. Option startingOffsets earliest is used to read all data available in the Kafka at the start of the query, we may not use this option that often and the default value for startingOffsets is latest which reads only new data that’s not been processed.


val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "192.168.1.100:9092")
        .option("subscribe", "json_topic")
        .option("startingOffsets", "earliest") // From starting
        .load()

df.printSchema()

Since there are multiple options to stream from, we need to explicitly state from where you are streaming with format("kafka") and should provide the Kafka servers and subscribe to the topic you are streaming from using the option.

df.printSchema() returns the schema of streaming data from Kafka. The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata.

Spark Streaming Kafka Example

3. Spark Streaming Write to Console

Since the value is in binary, first we need to convert the binary value to String using selectExpr()


val personStringDF = df.selectExpr("CAST(value AS STRING)")

Now, extract the value which is in JSON String to DataFrame and convert to DataFrame columns using custom schema.


val schema = new StructType()
      .add("id",IntegerType)
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType)
      .add("dob_year",IntegerType)
      .add("dob_month",IntegerType)
      .add("gender",StringType)
      .add("salary",IntegerType)

 val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
   .select("data.*")

personDF.writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination()

The complete Streaming Kafka Example code can be downloaded from GitHub. After download, import project to your favorite IDE and change Kafka broker IP address to your server IP on SparkStreamingConsumerKafkaJson.scala program. When you run this program, you should see Batch: 0 with data. As you input new data(from step 1), results get updated with Batch: 1, Batch: 2 and so on.


Batch: 0
 +---+---------+----------+--------+----+------+------+
 | id|firstname|middlename|lastname| dob|gender|salary|
 +---+---------+----------+--------+----+------+------+
 |  1|   James |          |   Smith|null|     M|  3000|
 |  2| Michael |      Rose|        |null|     M|  4000|
 |  3|  Robert |          |Williams|null|     M|  4000|
 |  4|   Maria |      Anne|   Jones|null|     F|  4000|
 +---+---------+----------+--------+----+------+------+  

Batch: 1
 +---+---------+----------+--------+---+------+------+
 | id|firstname|middlename|lastname|dob|gender|salary|
 +---+---------+----------+--------+---+------+------+
 +---+---------+----------+--------+---+------+------+
 
 Batch: 2
 +---+---------+----------+--------+----+------+------+
 | id|firstname|middlename|lastname| dob|gender|salary|
 +---+---------+----------+--------+----+------+------+
 |  1|   James |          |   Smith|null|     M|  3000|
 +---+---------+----------+--------+----+------+------+

4. Spark Streaming Write to Kafka Topic

Note that In order to write Spark Streaming data to Kafka, value column is required and all other fields are optional.
columns key and value are binary in Kafka; hence, first, these should convert to String before processing. If a key column is not specified, then a null valued key column will be automatically added.

Let’s produce the data to Kafka topic "json_data_topic". Since we are processing JSON, let’s convert data to JSON using to_json() function and store it in a value column.


df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
   .writeStream
   .format("kafka")
   .outputMode("append")
   .option("kafka.bootstrap.servers", "192.168.1.100:9092")
   .option("topic", "josn_data_topic")
   .start()
   .awaitTermination()

use writeStream.format("kafka") to write the streaming DataFrame to Kafka topic. Since we are just reading a file (without any aggregations) and writing as-is, we are using outputMode("append"). OutputMode is used to what data will be written to a sink when there is new data available in a DataFrame/Dataset

5. Run Kafka Consumer Shell

Now run the Kafka consumer shell program that comes with Kafka distribution.


bin/kafka-console-consumer.sh \
--broker-list localhost:9092 --topic josn_data_topic

As you feed more data (from step 1), you should see JSON output on the consumer shell console.

Next Steps

You can also read articles Streaming JSON files from a folder and from TCP socket to know different ways of streaming.

I would also recommend reading Spark Streaming + Kafka Integration and Structured Streaming with Kafka for more knowledge on structured streaming.

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply

This Post Has 2 Comments

  1. Prabhu

    thanks

  2. Anonymous

    Do you have this example in Gthub repository