In Spark, avro-module is an external module and needed to add this module when processing Avro file and this avro-module provides function to_avro() to encode DataFrame column value to Avro binary format, and from_avro() to decode Avro binary data into a string value.

In this article, you will learn how to use from_avro() and to_avro() with Spark examples. mostly these functions are used in conjunction with Kafka when we need to read/write Kafka messages in Avro format hence, I will explain with Kafka context.

Let’s assume that we have Kafka topic avro_data_topic and data to this topic (in Value field) are being sent in Avro format.

1. from_avro() – Reading Avro data from Kafka Topic

The from_avro() function from Spark module spark-avro is used to convert Avro binary format to string format.

Syntax


from_avro(data : org.apache.spark.sql.Column, jsonFormatSchema : scala.Predef.String) : org.apache.spark.sql.Column

Spark uses readStream() on SparkSession to load a streaming Dataset from kafka topic. option("startingOffsets","earliest") is used to read all data available in the topic at the start/earliest of the query, we may not use this option that often and the default value for startingOffsets is latest which reads only new data that’s yet to process.


val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "192.168.1.100:9092")
        .option("subscribe", "avro_topic")
        .option("startingOffsets", "earliest") // From starting
        .load()

To decode Avro data, we should use from_avro() function and this function takes Avro schema string as a parameter. For our example, I am going to load this schema from a person.avsc file. For reference, below is Avro’s schema we going to use.


{
  "type": "record",
  "name": "Person",
  "namespace": "com.sparkbyexamples",
  "fields": [
    {"name": "id","type": ["int", "null"]},
    {"name": "firstname","type": ["string", "null"]},
    {"name": "middlename","type": ["string", "null"]},
    {"name": "lastname","type": ["string", "null"]},
    {"name": "dob_year","type": ["int", "null"]},
    {"name": "dob_month","type": ["int", "null"]},
    {"name": "gender","type": ["string", "null"]},
    {"name": "salary","type": ["int", "null"]}
  ]
}

The Schema defines the field names and data types. The receiver of Avro data needs to know this Schema one time before starting processing.


val jsonFormatSchema = new String(
Files.readAllBytes(Paths.get("/src/main/resources/person.avsc")))

val personDF = df.select(from_avro(col("value"),
jsonFormatSchema).as("person"))
        .select("person.*")

2. to_avro() – Writing Avro data to Kafka Topic

Avro to_avro() function from the spark-avro module is used to convert a String value into Avro binary format.

Syntax


to_avro(data : org.apache.spark.sql.Column) : org.apache.spark.sql.Column

Let’s produce the data to Kafka topic "avro_data_topic2". Since we are processing Avro message in Spark, we need to encode data using to_avro() function and store it in a “value” column as Kafka needs data to be present in this field/column.


personDF.select(to_avro(struct("value")) as "value")
      .writeStream
      .format("kafka")
      .outputMode("append")
      .option("kafka.bootstrap.servers", "192.168.1.100:9092")
      .option("topic", "avro_data_topic")
      .option("checkpointLocation","c:/tmp")
      .start()
      .awaitTermination()

using writeStream.format("kafka") to write the streaming DataFrame to Kafka topic. Since we are just reading a file (without any aggregations) and writing as-is, we are using outputMode("append")OutputMode is used to what data will be written to a sink when there is new data available in a DataFrame/Dataset.

If you want a complete example and would like to execute and see the output, please access How to Stream Kafka messages in Avro format

Conclusion

In this Spark article, you have learned the syntax of the to_avro() and from_avro() functions from spark-avro module and usage of these functions with Kafka example in Scala.

Reference

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium