Spark Streaming – Kafka messages in Avro format

This article describes Spark Structured Streaming from Kafka in Avro file format and usage of from_avro() and to_avro() SQL functions using the Scala programming language.

Spark streaming kafka
Spark Streaming Kafka messages in Avro

Before deep-diving into this further let’s understand a few points regarding Spark Streaming, Kafka and Avro.

Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads. It is an extension of the core Spark API to process real-time data from sources like Kafka, Flume, and Amazon Kinesis to name it a few. This processed data can be pushed to databases, Kafka, live dashboards e.t.c

Apache Kafka is a publish-subscribe messaging system originally written at LinkedIn. Accessing Kafka is enabled by using below Kafka client Maven dependency.

<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
     <version>2.4.0</version>
</dependency>

Apache Avro is a data serialization system, it is mostly used in Apache Spark especially for Kafka-based data pipelines. When Avro data is stored in a file, its schema is stored with it, so that files may be processed later by any program.

Accessing Avro from Spark is enabled by using below Spark-Avro Maven dependency. The spark-avro module is external and not included in spark-submit or spark-shell by default.

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-avro_2.11</artifactId>
    <version>2.4.0</version>
</dependency>

Prerequisites

If you don’t have Kafka cluster setup, follow the below articles to set up the single broker cluster and get familiar with creating and describing topics.

Reading Avro data from Kafka Topic

Streaming uses readStream() on SparkSession to load a streaming Dataset. option("startingOffsets","earliest") is used to read all data available in the topic at the start/earliest of the query, we may not use this option that often and the default value for startingOffsets is latest which reads only new data that’s yet to process.


val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "192.168.1.100:9092")
        .option("subscribe", "avro_topic")
        .option("startingOffsets", "earliest") // From starting
        .load()

df.printSchema()

df.printSchema() returns the schema of Kafka streaming. The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata.

Spark Streaming Kafka Example

When we are writing to Kafka, Value is required and all other fields are optional.
key and value are binary in Kafka; first, these should convert to String before we process. If a key column is not specified then a null valued key column will be automatically added.

To decode Avro data, we should use from_avro() function and this function takes Avro schema string as a parameter. For our example, I am going to load this schema from a person.avsc file. For reference, below is Avro’s schema we going to use.


{
  "type": "record",
  "name": "Person",
  "namespace": "com.sparkbyexamples",
  "fields": [
    {"name": "id","type": ["int", "null"]},
    {"name": "firstname","type": ["string", "null"]},
    {"name": "middlename","type": ["string", "null"]},
    {"name": "lastname","type": ["string", "null"]},
    {"name": "dob_year","type": ["int", "null"]},
    {"name": "dob_month","type": ["int", "null"]},
    {"name": "gender","type": ["string", "null"]},
    {"name": "salary","type": ["int", "null"]}
  ]
}

The Schema defines the field names and data types. The receiver of Avro data needs to know this Schema one time before starting processing.


val jsonFormatSchema = new String(
Files.readAllBytes(Paths.get("./src/main/resources/person.avsc")))

val personDF = df.select(from_avro(col("value"),
jsonFormatSchema).as("person"))
        .select("person.*")

from_avro also takes a parameter that needs to decode. here we are decoding the Kafka value field.

Writing Avro data to Kafka Topic

Let’s produce the data to Kafka topic "avro_data_topic". Since we are processing Avro, let’s encode data using to_avro() function and store it in a “value” column as Kafka needs data to be present in this field/column.


personDF.select(to_avro(struct("value")) as "value")
      .writeStream
      .format("kafka")
      .outputMode("append")
      .option("kafka.bootstrap.servers", "192.168.1.100:9092")
      .option("topic", "avro_data_topic")
      .option("checkpointLocation","c:/tmp")
      .start()
      .awaitTermination()

using writeStream.format("kafka") to write the streaming DataFrame to Kafka topic. Since we are just reading a file (without any aggregations) and writing as-is, we are using outputMode("append"). OutputMode is used to what data will be written to a sink when there is new data available in a DataFrame/Dataset

How to Run?

First will start a Kafka shell producer that comes with Kafka distribution and produces JSON message. later, I will write a Spark Streaming program that consumes these messages, converts it to Avro and sends it to another Kafka topic. Finally will create another Spark Streaming program that consumes Avro messages from Kafka, decodes the data to and writes it to Console.

1. Run Kafka Producer Shell

Run the Kafka Producer shell that comes with Kafka distribution and inputs the JSON data from person.json. To feed data, just copy one line at a time from person.json file and paste it on the console where Kafka Producer shell is running.


bin/kafka-console-producer.sh \
--broker-list localhost:9092 --topic json_topic

2. Run Kafka Producer

The complete Spark Streaming Avro Kafka Example code can be downloaded from GitHub. On this program change Kafka broker IP address to your server IP and run KafkaProduceAvro.scala from your favorite editor. This program reads the JSON message from Kafka topic "json_topic", encode the data to Avro and sends it to another Kafka topic "avro_topic".


package com.sparkbyexamples.spark.streaming.kafka.avro
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json,to_json,struct}
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
object KafkaProduceAvro {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExample.com")
      .getOrCreate()
    /*
    Disable logging as it writes too much log
     */
    spark.sparkContext.setLogLevel("ERROR")
    /*
    This consumes JSON data from Kafka
     */
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.1.100:9092")
      .option("subscribe", "json_topic")
      .option("startingOffsets", "earliest") // From starting
      .load()
    /*
     Prints Kafka schema with columns (topic, offset, partition e.t.c)
      */
    df.printSchema()
    val schema = new StructType()
      .add("id",IntegerType)
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType)
      .add("dob_year",IntegerType)
      .add("dob_month",IntegerType)
      .add("gender",StringType)
      .add("salary",IntegerType)
    /*
    Converts JSON string to DataFrame
     */
    val personDF = df.selectExpr("CAST(value AS STRING)") // First convert binary to string
      .select(from_json(col("value"), schema).as("data"))
    personDF.printSchema()

    /*
      * Convert DataFrame columns to Avro format and name it as "value"
      * And send this Avro data to Kafka topic
      */
    personDF.select(to_avro(struct("data.*")) as "value")
      .writeStream
      .format("kafka")
      .outputMode("append")
      .option("kafka.bootstrap.servers", "192.168.1.100:9092")
      .option("topic", "avro_topic")
      .option("checkpointLocation","c:/tmp")
      .start()
      .awaitTermination()
  }
}

3. Run Kafka Consumer

This program reads Avro message from Kafka topic "avro_topics", decodes it and finally streams to console.


package com.sparkbyexamples.spark.streaming.kafka.avro
import java.nio.file.{Files, Paths}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro._
import org.apache.spark.sql.functions.col

object KafkaConsumerAvro {
    def main(args: Array[String]): Unit = {

      val spark: SparkSession = SparkSession.builder()
        .master("local")
        .appName("SparkByExample.com")
        .getOrCreate()
      spark.sparkContext.setLogLevel("ERROR")
      val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "192.168.1.100:9092")
        .option("subscribe", "avro_topic")
        .option("startingOffsets", "earliest") // From starting
        .load()
      /*
       Prints Kafka schema with columns (topic, offset, partition e.t.c)
        */
      df.printSchema()
      /*
      Read schema to convert Avro data to DataFrame
       */
      val jsonFormatSchema = new String(
        Files.readAllBytes(Paths.get("./src/main/resources/person.avsc")))

      val personDF = df.select(from_avro(col("value"), jsonFormatSchema).as("person"))
        .select("person.*")
      personDF.printSchema()
      /*
      Stream data to Console for testing
       */
      personDF.writeStream
        .format("console")
        .outputMode("append")
        .start()
        .awaitTermination()
  }
}

Execute this program from Console (make sure you are also running steps 1 and 2) and go to console where you are running step 1, and feed data, by copying one line at a time from person.json file and paste it on the console where Kafka Producer shell is running. You should see messages displaying on Step 3 console.

Conclusion:

In this post, we have seen how to consume Kafka messages encoded with Avro, decode using from_avro() function for processing and finally encode it to Avro using to_avro() function and sent it to Kafka.

Next Steps

You can also read articles Streaming JSON files from a folder and from TCP socket to know different ways of streaming.

I would also recommend reading Spark Streaming + Kafka Integration and Structured Streaming with Kafka for more knowledge on structured streaming.

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply

This Post Has 7 Comments

  1. Anonymous

    +1 for “article on deserializing Avro using Schema registry.” messages, it would be great!

  2. MB

    Hello Guys, Can you do one article on deserializing Avro using Schema registry.

    1. NNK

      Thanks for reading. Sure. will do it.

  3. Anonymous

    I have used 2.4.4 with spark_avro_2.11, from_avro is not able to parse data correctly. Instead of giving actual data, it is giving default value of the type( blank for string and 0 for int). Data is correctly received from kafka.

    1. NNK

      Can you please send me your code snippet for review?

  4. sparkbyexamples

    I’ve tried the same example and it works on 2.3.0. Have you tried it? Please let me know if it works or not.

    1. DD

      Hi, Can you please write an article on how to stream Kafka AVRO data in Spark 2.3 using a schema registry ?