In Spark, avro-module
is an external module and needed to add this module when processing Avro file and this avro-module
provides function to_avro()
to encode DataFrame column value to Avro binary format, and from_avro()
to decode Avro binary data into a string value.
In this article, you will learn how to use from_avro() and to_avro() with Spark examples. mostly these functions are used in conjunction with Kafka when we need to read/write Kafka messages in Avro format hence, I will explain with Kafka context.
Let’s assume that we have Kafka topic avro_data_topic
and data to this topic (in Value field) are being sent in Avro format.
1. from_avro() – Reading Avro data from Kafka Topic
The from_avro()
function from Spark module spark-avro
is used to convert Avro binary format to string format.
Syntax
from_avro(data : org.apache.spark.sql.Column, jsonFormatSchema : scala.Predef.String) : org.apache.spark.sql.Column
Spark uses readStream()
on SparkSession to load a streaming Dataset from kafka topic. option("startingOffsets","earliest")
is used to read all data available in the topic at the start/earliest of the query, we may not use this option that often and the default value for startingOffsets
is latest
which reads only new data that’s yet to process.
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.100:9092")
.option("subscribe", "avro_topic")
.option("startingOffsets", "earliest") // From starting
.load()
To decode Avro data, we should use from_avro()
function and this function takes Avro schema string as a parameter. For our example, I am going to load this schema from a person.avsc file. For reference, below is Avro’s schema we going to use.
{
"type": "record",
"name": "Person",
"namespace": "com.sparkbyexamples",
"fields": [
{"name": "id","type": ["int", "null"]},
{"name": "firstname","type": ["string", "null"]},
{"name": "middlename","type": ["string", "null"]},
{"name": "lastname","type": ["string", "null"]},
{"name": "dob_year","type": ["int", "null"]},
{"name": "dob_month","type": ["int", "null"]},
{"name": "gender","type": ["string", "null"]},
{"name": "salary","type": ["int", "null"]}
]
}
The Schema defines the field names and data types. The receiver of Avro data needs to know this Schema one time before starting processing.
val jsonFormatSchema = new String(
Files.readAllBytes(Paths.get("/src/main/resources/person.avsc")))
val personDF = df.select(from_avro(col("value"),
jsonFormatSchema).as("person"))
.select("person.*")
2. to_avro() – Writing Avro data to Kafka Topic
Avro to_avro()
function from the spark-avro module is used to convert a String value into Avro binary format.
Syntax
to_avro(data : org.apache.spark.sql.Column) : org.apache.spark.sql.Column
Let’s produce the data to Kafka topic "avro_data_topic2"
. Since we are processing Avro message in Spark, we need to encode data using to_avro()
function and store it in a “value” column as Kafka needs data to be present in this field/column.
personDF.select(to_avro(struct("value")) as "value")
.writeStream
.format("kafka")
.outputMode("append")
.option("kafka.bootstrap.servers", "192.168.1.100:9092")
.option("topic", "avro_data_topic")
.option("checkpointLocation","c:/tmp")
.start()
.awaitTermination()
using writeStream.format("kafka")
to write the streaming DataFrame to Kafka topic. Since we are just reading a file (without any aggregations) and writing as-is, we are using outputMode("append")
. OutputMode is used to what data will be written to a sink when there is new data available in a DataFrame/Dataset.
If you want a complete example and would like to execute and see the output, please access How to Stream Kafka messages in Avro format
Conclusion
In this Spark article, you have learned the syntax of the to_avro()
and from_avro()
functions from spark-avro
module and usage of these functions with Kafka example in Scala.
Related Articles
- What is Spark Streaming Checkpoint?
- Spark Write DataFrame to CSV File
- Spark Read Files from HDFS (TXT, CSV, AVRO, PARQUET, JSON)
- Spark – Read & Write Avro files from Amazon S3
- Spark Convert Avro file to CSV
- Spark Convert Parquet file to JSON
- Spark Convert Parquet file to Avro
- Spark Convert Avro file to JSON
- Spark Convert Avro file to Parquet
Reference
Happy Learning !!