Spark Streaming from Kafka Example

Spark Streaming From Kafka Example
Spark Streaming from Kafka Example

Using Spark Streaming we can read from Kafka topic and write to Kafka topic in TEXT, CSV, AVRO and JSON formats, In this article, we will learn with scala example of how to stream from Kafka messages in JSON format using from_json() and to_json() SQL functions.

What is Spark Streaming?

Apache Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads. It is an extension of the core Spark API to process real-time data from sources like Kafka, Flume, and Amazon Kinesis to name a few. This processed data can be pushed to other systems like databases, Kafka, live dashboards e.t.c

What is Apache Kafka

Apache Kafka is a publish-subscribe messaging system originally written at LinkedIn.
A Kafka cluster is a highly scalable and fault-tolerant system and it also has a much higher throughput compared to other message brokers such as ActiveMQ and RabbitMQ

Prerequisites

If you don’t have Kafka cluster setup, follow the below articles to set up the single broker cluster and get familiar with creating and describing topics.

Table of contents

1. Run Kafka Producer Shell

First, let’s produce some JSON data to Kafka topic "json_topic", Kafka distribution comes with Kafka Producer shell, run this producer and input the JSON data from person.json. Just copy one line at a time from person.json file and paste it on the console where Kafka Producer shell is running.

Note: By default when you write a message to a topic, Kafka automatically creates a topic however, you can also create a topic manually and specify your partition and replication factor.


bin/kafka-console-producer.sh \
--broker-list localhost:9092 --topic json_topic
Spark Streaming Kafka Example

2. Streaming from Kafka

2.1. Kafka Maven dependency

In order to streaming data from Kafka topic, we need to use below Kafka client Maven dependencies. You use the version according to yo your Kafka and Scala versions

<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
     <version>2.4.0</version>
</dependency>

2.2 Spark Streaming Scala example

Spark Streaming uses readStream() on SparkSession to load a streaming Dataset from Kafka. Option startingOffsets earliest is used to read all data available in the Kafka at the start of the query, we may not use this option that often and the default value for startingOffsets is latest which reads only new data that’s not been processed.


val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "192.168.1.100:9092")
        .option("subscribe", "json_topic")
        .option("startingOffsets", "earliest") // From starting
        .load()

df.printSchema()

Since there are multiple options to stream from, we need to explicitly state from where you are streaming with format("kafka") and should provide the Kafka servers and subscribe to the topic you are streaming from using the option.

df.printSchema() returns the schema of streaming data from Kafka. The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata.

Spark Streaming Kafka Example

3. Spark Streaming Write to Console

Since the value is in binary, first we need to convert the binary value to String using selectExpr()


val personStringDF = df.selectExpr("CAST(value AS STRING)")

Now, extract the value which is in JSON String to DataFrame and convert to DataFrame columns using custom schema.


val schema = new StructType()
      .add("id",IntegerType)
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType)
      .add("dob_year",IntegerType)
      .add("dob_month",IntegerType)
      .add("gender",StringType)
      .add("salary",IntegerType)

 val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
   .select("data.*")

personDF.writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination()

The complete Streaming Kafka Example code can be downloaded from GitHub. After download, import project to your favorite IDE and change Kafka broker IP address to your server IP on SparkStreamingConsumerKafkaJson.scala program. When you run this program, you should see Batch: 0 with data. As you input new data(from step 1), results get updated with Batch: 1, Batch: 2 and so on.


Batch: 0
 +---+---------+----------+--------+----+------+------+
 | id|firstname|middlename|lastname| dob|gender|salary|
 +---+---------+----------+--------+----+------+------+
 |  1|   James |          |   Smith|null|     M|  3000|
 |  2| Michael |      Rose|        |null|     M|  4000|
 |  3|  Robert |          |Williams|null|     M|  4000|
 |  4|   Maria |      Anne|   Jones|null|     F|  4000|
 +---+---------+----------+--------+----+------+------+  

Batch: 1
 +---+---------+----------+--------+---+------+------+
 | id|firstname|middlename|lastname|dob|gender|salary|
 +---+---------+----------+--------+---+------+------+
 +---+---------+----------+--------+---+------+------+
 
 Batch: 2
 +---+---------+----------+--------+----+------+------+
 | id|firstname|middlename|lastname| dob|gender|salary|
 +---+---------+----------+--------+----+------+------+
 |  1|   James |          |   Smith|null|     M|  3000|
 +---+---------+----------+--------+----+------+------+

4. Spark Streaming Write to Kafka Topic

Note that In order to write Spark Streaming data to Kafka, value column is required and all other fields are optional.
columns key and value are binary in Kafka; hence, first, these should convert to String before processing. If a key column is not specified, then a null valued key column will be automatically added.

Let’s produce the data to Kafka topic "json_data_topic". Since we are processing JSON, let’s convert data to JSON using to_json() function and store it in a value column.


df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
   .writeStream
   .format("kafka")
   .outputMode("append")
   .option("kafka.bootstrap.servers", "192.168.1.100:9092")
   .option("topic", "josn_data_topic")
   .start()
   .awaitTermination()

use writeStream.format("kafka") to write the streaming DataFrame to Kafka topic. Since we are just reading a file (without any aggregations) and writing as-is, we are using outputMode("append"). OutputMode is used to what data will be written to a sink when there is new data available in a DataFrame/Dataset

5. Run Kafka Consumer Shell

Now run the Kafka consumer shell program that comes with Kafka distribution.


bin/kafka-console-consumer.sh \
--broker-list localhost:9092 --topic josn_data_topic

As you feed more data (from step 1), you should see JSON output on the consumer shell console.

Next Steps

You can also read articles Streaming JSON files from a folder and from TCP socket to know different ways of streaming.

I would also recommend reading Spark Streaming + Kafka Integration and Structured Streaming with Kafka for more knowledge on structured streaming.

Happy Learning !!

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

Leave a Reply