Spark provides built-in support to read from and write DataFrame to Avro file using “spark-avro” library. In this tutorial, you will learn reading and writing Avro file along with schema, partitioning data for performance with Scala example.
If you are using Spark 2.3 or older then please use this URL.
Table of the contents:
- Apache Avro Introduction
- Apache Avro Advantages
- Spark Avro dependency
- Writing Avro Data File from DataFrame
- Reading Avro Data File to DataFrame
- Writing DataFrame to Avro Partition
- Reading Avro Partition data to DataFrame
- Using Avro Schema
- Using Spark SQL
Related: Spark from_avro() and to_avro() usage
1. What is Apache Avro?
Apache Avro is an open-source, row-based, data serialization and data exchange framework for Hadoop projects, originally developed by databricks as an open-source library that supports reading and writing data in Avro file format. it is mostly used in Apache Spark especially for Kafka-based data pipelines. When Avro data is stored in a file, its schema is stored with it, so that files may be processed later by any program.
It has build to serialize and exchange big data between different Hadoop based projects. It serializes data in a compact binary format and schema is in JSON format that defines the field names and data types.
It is similar to Thrift and Protocol Buffers, but does not require the code generation as it’s data always accompanied by a schema that permits full processing of that data without code generation. This is one of the great advantages compared with other serialization systems.
2. Apache Avro Advantages
- Supports complex data structures like Arrays, Map, Array of map and map of array elements.
- A compact, binary serialization format which provides fast while transferring data.
- row-based data serialization system.
- Support multi-languages, meaning data written by one language can be read by different languages.
- Code generation is not required to read or write data files.
- Simple integration with dynamic languages.
3. Spark Avro dependencies
Since Spark 2.4, Spark SQL provides built-in support for reading and writing Apache Avro data files, however, the spark-avro
module is external and by default, it’s not included in spark-submit
or spark-shell
hence, accessing Avro file format in Spark is enabled by providing a package.
3.1 maven dependencies.
// Maven dependencies
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.0</version>
</dependency>
3.2 spark-submit
While using spark-submit
, provide spark-avro_2.12
and its dependencies directly using --packages
, such as,
// Spark-submit
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:2.4.4
3.3 spark-shell
While working with spark-shell
, you can also use --packages
to add spark-avro_2.12
and its dependencies directly,
// Spark-shell
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:2.4.4
4. Write Spark DataFrame to Avro Data File
Since Avro library is external to Spark, it doesn’t provide avro()
function on DataFrameWriter
, hence we should use DataSource “avro
” or “org.apache.spark.sql.avro
” to write Spark DataFrame to Avro file.
// Write Spark DataFrame to Avro Data File
df.write.format("avro").save("person.avro")
5. Read Avro Data File to Spark DataFrame
Similarly avro()
function is not provided in Spark DataFrameReader
hence, we should use DataSource format as “avro” or “org.apache.spark.sql.avro” and load()
is used to read the Avro file.
// Read Avro Data File to Spark DataFrame
val personDF= spark.read.format("avro").load("person.avro")
6. Writing Avro Partition Data
Spark DataFrameWriter
provides partitionBy()
function to partition the Avro at the time of writing. Partition improves performance on reading by reducing Disk I/O.
// Writing Avro Partition Data
val data = Seq(("James ","","Smith",2018,1,"M",3000),
("Michael ","Rose","",2010,3,"M",4000),
("Robert ","","Williams",2010,3,"M",4000),
("Maria ","Anne","Jones",2005,5,"F",4000),
("Jen","Mary","Brown",2010,7,"",-1)
)
val columns = Seq("firstname", "middlename", "lastname", "dob_year",
"dob_month", "gender", "salary")
import spark.sqlContext.implicits._
val df = data.toDF(columns:_*)
df.write.partitionBy("dob_year","dob_month")
.format("avro").save("person_partition.avro")
This example creates partition by “date of birth year and month” on person data. As shown in the below screenshot, Avro creates a folder for each partition data.

7. Reading Avro Partition Data
When we try to retrieve the data from partition, It just reads the data from the partition folder without scanning entire Avro files.
// Reading Avro Partition Data
spark.read
.format("avro")
.load("person_partition.avro")
.where(col("dob_year") === 2010)
.show()
8. Using Avro Schema
Avro schemas are usually defined with .avsc extension and the format of the file is in JSON. Will store below schema in person.avsc
file and provide this file using option() while reading an Avro file. This schema provides the structure of the Avro file with field names and it’s data types.
// Using Avro Schema
{
"type": "record",
"name": "Person",
"namespace": "com.sparkbyexamples",
"fields": [
{"name": "firstname","type": "string"},
{"name": "middlename","type": "string"},
{"name": "lastname","type": "string"},
{"name": "dob_year","type": "int"},
{"name": "dob_month","type": "int"},
{"name": "gender","type": "string"},
{"name": "salary","type": "int"}
]
}
You can download Avro schema example from GitHub
val schemaAvro = new Schema.Parser()
.parse(new File("src/main/resources/person.avsc"))
val df = spark.read
.format("avro")
.option("avroSchema", schemaAvro.toString)
.load("person.avro")
Alternatively, we can also specify the StructType using the schema method.
9. Using Avro with Spark SQL
We can also read Avro data files using SQL, to do this, first, create a temporary table by pointing to the Avro data file and run the SQL command on the table.
// Using Avro with Spark SQL
spark.sqlContext.sql("CREATE TEMPORARY VIEW PERSON USING avro
OPTIONS (path \"person.avro\")")
spark.sqlContext.sql("SELECT * FROM PERSON").show()
Conclusion:
We have seen examples of how to write Avro data files and how to read using Spark DataFrame. Also, I’ve explained working with Avro partition and how it improves while reading Avro file. Using Partition we can achieve a significant performance on reading.
Related Articles
References:
Complete Scala example for Reference
Below is complete scala example of how to read & write Spark DataFrame to Avro files that I’ve tested in my development environment.
package com.sparkbyexamples.spark.dataframe
import java.io.File
import org.apache.avro.Schema
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions._
/**
* Spark Avro library example
* Avro schema example
* Avro file format
*
*/
object AvroExample {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
val data = Seq(("James ", "", "Smith", 2018, 1, "M", 3000),
("Michael ", "Rose", "", 2010, 3, "M", 4000),
("Robert ", "", "Williams", 2010, 3, "M", 4000),
("Maria ", "Anne", "Jones", 2005, 5, "F", 4000),
("Jen", "Mary", "Brown", 2010, 7, "", -1)
)
val columns = Seq("firstname", "middlename", "lastname", "dob_year",
"dob_month", "gender", "salary")
import spark.sqlContext.implicits._
val df = data.toDF(columns: _*)
/**
* Spark DataFrame Write Avro File
*/
df.write.format("avro")
.mode(SaveMode.Overwrite)
.save("C:\\tmp\\spark_out\\avro\\person.avro")
/**
* Spark DataFrame Read Avro File
*/
spark.read.format("avro").load("/tmp/spark_out/avro/person.avro").show()
/**
* Write Avro Partition
*/
df.write.partitionBy("dob_year","dob_month")
.format("avro")
.mode(SaveMode.Overwrite)
.save("/tmp/spark_out/avro/person_partition.avro")
/**
* Reading Avro Partition
*/
spark.read
.format("avro")
.load("/tmp/spark_out/avro/person_partition.avro")
.where(col("dob_year") === 2010)
.show()
/**
* Explicit Avro schema
*/
val schemaAvro = new Schema.Parser()
.parse(new File("src/main/resources/person.avsc"))
spark.read
.format("avro")
.option("avroSchema", schemaAvro.toString)
.load("/tmp/spark_out/avro/person.avro")
.show()
/**
* Avro Spark SQL
*/
spark.sqlContext.sql("CREATE TEMPORARY VIEW PERSON USING avro
OPTIONS (path \"/tmp/spark_out/avro/person.avro\")")
spark.sqlContext.sql("SELECT * FROM PERSON").show()
}
}
I am getting the below error when trying to run to code in IntelliJ
Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
Probably you are missing Avro library, what version of Spark are you using?