Using Spark Streaming we can read from Kafka topic and write to Kafka topic in TEXT, CSV, AVRO and JSON formats, In this article, we will learn with scala example of how to stream from Kafka messages in JSON format using from_json()
and to_json()
SQL functions.
Apache Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads. It is an extension of the core Spark API to process real-time data from sources like Kafka, Flume, and Amazon Kinesis to name a few. This processed data can be pushed to other systems like databases, Kafka, live dashboards e.t.c
Prerequisites
If you don’t have Kafka cluster setup, follow the below articles to set up the single broker cluster and get familiar with creating and describing topics.
Table of contents
- Write JSON to Kafka using producer shell
- Streaming from Kafka topic
- Write Streamed JSON to Console
- Write Streamed JSON to Kafka topic
- Read JSON from Kafka using consumer shell
1. Run Kafka Producer Shell
First, let’s produce some JSON data to Kafka topic "json_topic"
, Kafka distribution comes with Kafka Producer shell, run this producer and input the JSON data from person.json. Just copy one line at a time from person.json file and paste it on the console where Kafka Producer shell is running.
Note: By default when you write a message to a topic, Kafka automatically creates a topic however, you can also create a topic manually and specify your partition and replication factor.
bin/kafka-console-producer.sh \
--broker-list localhost:9092 --topic json_topic
2. Streaming With Kafka
2.1. Kafka Maven dependency
In order to streaming data from Kafka topic, we need to use below Kafka client Maven dependencies. You use the version according to yo your Kafka and Scala versions
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.0</version>
</dependency>
2.2 Spark Streaming Scala example
Spark Streaming uses readStream()
on SparkSession to load a streaming Dataset from Kafka. Option startingOffsets earliest
is used to read all data available in the Kafka at the start of the query, we may not use this option that often and the default value for startingOffsets
is latest
which reads only new data that’s not been processed.
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.100:9092")
.option("subscribe", "json_topic")
.option("startingOffsets", "earliest") // From starting
.load()
df.printSchema()
Since there are multiple options to stream from, we need to explicitly state from where you are streaming with format("kafka")
and should provide the Kafka servers and subscribe to the topic you are streaming from using the option.
df.printSchema()
returns the schema of streaming data from Kafka. The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata.
3. Spark Streaming Write to Console
Since the value is in binary, first we need to convert the binary value to String using selectExpr()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
Now, extract the value which is in JSON String to DataFrame and convert to DataFrame columns using custom schema.
val schema = new StructType()
.add("id",IntegerType)
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType)
.add("dob_year",IntegerType)
.add("dob_month",IntegerType)
.add("gender",StringType)
.add("salary",IntegerType)
val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
.select("data.*")
personDF.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
The complete Streaming Kafka Example code can be downloaded from GitHub. After download, import project to your favorite IDE and change Kafka broker IP address to your server IP on SparkStreamingConsumerKafkaJson.scala
program. When you run this program, you should see Batch: 0 with data. As you input new data(from step 1), results get updated with Batch: 1, Batch: 2 and so on.
Batch: 0
+---+---------+----------+--------+----+------+------+
| id|firstname|middlename|lastname| dob|gender|salary|
+---+---------+----------+--------+----+------+------+
| 1| James | | Smith|null| M| 3000|
| 2| Michael | Rose| |null| M| 4000|
| 3| Robert | |Williams|null| M| 4000|
| 4| Maria | Anne| Jones|null| F| 4000|
+---+---------+----------+--------+----+------+------+
Batch: 1
+---+---------+----------+--------+---+------+------+
| id|firstname|middlename|lastname|dob|gender|salary|
+---+---------+----------+--------+---+------+------+
+---+---------+----------+--------+---+------+------+
Batch: 2
+---+---------+----------+--------+----+------+------+
| id|firstname|middlename|lastname| dob|gender|salary|
+---+---------+----------+--------+----+------+------+
| 1| James | | Smith|null| M| 3000|
+---+---------+----------+--------+----+------+------+
4. Spark Streaming Write to Kafka Topic
Note that In order to write Spark Streaming data to Kafka, value
column is required and all other fields are optional.
columns key
and value
are binary in Kafka; hence, first, these should convert to String before processing. If a key column is not specified, then a null
valued key column will be automatically added.
Let’s produce the data to Kafka topic "json_data_topic"
. Since we are processing JSON, let’s convert data to JSON using to_json()
function and store it in a value column.
df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.outputMode("append")
.option("kafka.bootstrap.servers", "192.168.1.100:9092")
.option("topic", "josn_data_topic")
.start()
.awaitTermination()
use writeStream.format("kafka")
to write the streaming DataFrame to Kafka topic. Since we are just reading a file (without any aggregations) and writing as-is, we are using outputMode("append")
. OutputMode is used to what data will be written to a sink when there is new data available in a DataFrame/Dataset
5. Run Kafka Consumer Shell
Now run the Kafka consumer shell program that comes with Kafka distribution.
bin/kafka-console-consumer.sh \
--broker-list localhost:9092 --topic josn_data_topic
As you feed more data (from step 1), you should see JSON output on the consumer shell console.
Next Steps
You can also read articles Streaming JSON files from a folder and from TCP socket to know different ways of streaming.
I would also recommend reading Spark Streaming + Kafka Integration and Structured Streaming with Kafka for more knowledge on structured streaming.
Happy Learning !!
Related Articles
- Kafka consumer and producer example with a custom serializer
- Spark Streaming – Kafka messages in Avro format
- Spark SQL Batch Processing – Produce and Consume Apache Kafka Topic
- How to Create and Describe a Kafka topic
- Spark Create DataFrame with Examples
- Kafka Delete Topic and its messages
- How to Setup a Kafka Cluster (step-by-step)
- Apache Kafka Producer and Consumer in Scala
thanks
Do you have this example in Gthub repository