In this Spark tutorial, you will learn what is Avro format, It’s advantages and how to read the Avro file from Amazon S3 bucket into Dataframe and write DataFrame in Avro file to Amazon S3 bucket with Scala example.
Spark provides built-in support to read from and write DataFrame to Avro file using “spark-avro” library however, to write Avro file to Amazon S3 you need s3 library.
If you are using Spark 2.3 or older then please use this URL.
Table of the contents:
- Apache Avro Introduction
- Apache Avro Advantages
- Spark Avro dependency
- Writing Avro Data File to Amazon S3
- Reading Avro Data File to Amazon S3
- Writing DataFrame to Avro Partition
- Reading Avro Partition data to DataFrame
- Using Avro Schema
- Using Spark SQL with Amazon S3
Related: Spark from_avro() and to_avro() usage
What is Apache Avro?
Apache Avro is an open-source, row-based, data serialization and data exchange framework for Hadoop projects, originally developed by databricks as an open-source library that supports reading and writing data in Avro file format. it is mostly used in Apache Spark especially for Kafka-based data pipelines. When Avro data is stored in a file, its schema is stored with it, so that files may be processed later by any program.
It has build to serialize and exchange big data between different Hadoop based projects. It serializes data in a compact binary format and schema is in JSON format that defines the field names and data types.
It is similar to Thrift and Protocol Buffers, but does not require the code generation as it’s data always accompanied by a schema that permits full processing of that data without code generation. This is one of the great advantages compared with other serialization systems.
Apache Avro Advantages
- Supports complex data structures like Arrays, Map, Array of map and map of array elements.
- A compact, binary serialization format which provides fast while transferring data.
- row-based data serialization system.
- Support multi-languages, meaning data written by one language can be read by different languages.
- Code generation is not required to read or write data files.
- Simple integration with dynamic languages.
Spark Avro dependencies
Since Spark 2.4, Spark SQL provides built-in support for reading and writing Apache Avro data files, however, the spark-avro
module is external and by default, it’s not included in spark-submit
or spark-shell
hence, accessing Avro file format in Spark is enabled by providing a package.
maven dependencies.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.0</version>
</dependency>
spark-submit
While using spark-submit
, provide spark-avro_2.12
and its dependencies directly using --packages
, such as,
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:2.4.4
spark-shell
While working with spark-shell
, you can also use --packages
to add spark-avro_2.12
and its dependencies directly,
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:2.4.4
Amazon S3 bucket and dependency
In order to interact with Amazon S3 from Spark, we need to use the third party library. And this library has 3 different options.
Generation | Usage | Description |
First | s3:\\ | s3 which is also called classic (s3: filesystem for reading from or storing objects in Amazon S3 This has been deprecated and recommends to use either the second or third generation library. |
Second | s3n:\\ | s3n uses native s3 object and makes easy to use it with Hadoop and other files systems. |
Third | s3a:\\ | s3a – This is a replacement of s3n which supports larger files and improves in performance. |
In this example, we will use the latest version, which is option 3 using <strong>s3a:\\</strong>
. Below are the Hadoop and AWS dependencies you would need in order Spark to read/write files into Amazon AWS S3 storage
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.0.0</version>
</dependency>
You can find more details about these dependencies by referring URL from the reference section and use the one which is suitable for you. Regardless of which one you use, the steps of how to read/write to Amazon S3 would be exactly the same except s3a:\\
.
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.access.key", "awsaccesskey value")
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.secret.key", "aws secretkey value")
Write Spark DataFrame in Avro Data File to S3
Let’s see now how to write an Avro file to Amazon S3 bucket. Since Avro library is external to Spark, it doesn’t provide avro()
function on DataFrameWriter
, hence we should use DataSource “avro
” or “org.apache.spark.sql.avro
” to write Spark DataFrame to Avro file.
df.write.format("avro").save("s3a:\\sparkbyexamples\person.avro")
Read Avro Data File from S3 into Spark DataFrame
Now, let’s read an Avro file from Amazon AWS S3 bucket into Spark DataFrame. As mentioned earlier avro()
function is not provided in Spark DataFrameReader
hence, we should use DataSource format as “avro” or “org.apache.spark.sql.avro” and load()
is used to read the Avro file.
val personDF= spark.read.format("avro").load("s3a:\\sparkbyexamples\person.avro")
Writing Avro Partition Data into S3
Spark DataFrameWriter
provides partitionBy()
function to partition the Avro at the time of writing. Partition improves performance on reading by reducing Disk I/O.
val data = Seq(("James ","","Smith",2018,1,"M",3000),
("Michael ","Rose","",2010,3,"M",4000),
("Robert ","","Williams",2010,3,"M",4000),
("Maria ","Anne","Jones",2005,5,"F",4000),
("Jen","Mary","Brown",2010,7,"",-1)
)
val columns = Seq("firstname", "middlename", "lastname", "dob_year",
"dob_month", "gender", "salary")
import spark.sqlContext.implicits._
val df = data.toDF(columns:_*)
df.write.partitionBy("dob_year","dob_month")
.format("avro").save("person_partition.avro")
This example creates partition by “date of birth year and month” on person data. Avro creates a folder for each partition data and stores that specific partition data in this folder.
Reading Avro Partition Data from S3
When we try to retrieve the data from partition, It just reads the data from the partition folder without scanning entire Avro files.
spark.read
.format("avro")
.load("s3a://sparkbyexamples/person_partition.avro")
.where(col("dob_year") === 2010)
.show()
Using Avro Schema
Avro schemas are usually defined with .avsc extension and the format of the file is in JSON. Will store below schema in person.avsc
file and provide this file using option() while reading an Avro file. This schema provides the structure of the Avro file with field names and it’s data types.
{
"type": "record",
"name": "Person",
"namespace": "com.sparkbyexamples",
"fields": [
{"name": "firstname","type": "string"},
{"name": "middlename","type": "string"},
{"name": "lastname","type": "string"},
{"name": "dob_year","type": "int"},
{"name": "dob_month","type": "int"},
{"name": "gender","type": "string"},
{"name": "salary","type": "int"}
]
}
You can download Avro schema example from GitHub
val schemaAvro = new Schema.Parser()
.parse(new File("src/main/resources/person.avsc"))
val df = spark.read
.format("avro")
.option("avroSchema", schemaAvro.toString)
.load("s3a:\\sparkbyexamples\\person.avro")
Alternatively, we can also specify the StructType using the schema method.
Using Avro with Spark SQL
We can also read Avro data files using SQL, to do this, first, create a temporary table by pointing to the Avro data file and run the SQL command on the table.
spark.sqlContext.sql("CREATE TEMPORARY VIEW PERSON USING avro
OPTIONS (path \"s3a:\\sparkbyexamples\\person.avro\")")
spark.sqlContext.sql("SELECT * FROM PERSON").show()
Spark read from/write Avro into S3 Example
package com.sparkbyexamples.spark
import java.io.File
import org.apache.avro.Schema
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.col
object AvroAWSExample extends App{
val spark: SparkSession = SparkSession.builder().master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.access.key", "replace access key")
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.secret.key", "replace secret key")
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
val data = Seq(("James ", "", "Smith", 2018, 1, "M", 3000),
("Michael ", "Rose", "", 2010, 3, "M", 4000),
("Robert ", "", "Williams", 2010, 3, "M", 4000),
("Maria ", "Anne", "Jones", 2005, 5, "F", 4000),
("Jen", "Mary", "Brown", 2010, 7, "", -1)
)
val columns = Seq("firstname", "middlename", "lastname", "dob_year",
"dob_month", "gender", "salary")
import spark.sqlContext.implicits._
val df = data.toDF(columns: _*)
/**
* Write Avro File
*/
df.write.format("avro")
.mode(SaveMode.Overwrite)
.save("s3a://sparkbyexamples/avro/person.avro")
/**
* Read Avro File
*/
spark.read.format("avro").load("s3a://sparkbyexamples/avro/person.avro").show()
/**
* Write Avro Partition
*/
df.write.partitionBy("dob_year","dob_month")
.format("avro")
.mode(SaveMode.Overwrite)
.save("s3a://sparkbyexamples/avro/person_partition.avro")
/**
* Reading Avro Partition
*/
spark.read
.format("avro")
.load("s3a://sparkbyexamples/avro/person_partition.avro")
.where(col("dob_year") === 2010)
.show()
/**
* Explicit Avro schema
*/
val schemaAvro = new Schema.Parser()
.parse(new File("src/main/resources/person.avsc"))
spark.read
.format("avro")
.option("avroSchema", schemaAvro.toString)
.load("s3a://sparkbyexamples/avro/person.avro")
.show()
/**
* Avro Spark SQL
*/
spark.sqlContext.sql("CREATE TEMPORARY VIEW PERSON USING avro OPTIONS (path \"s3a://sparkbyexamples/avro/person.avro\")")
spark.sqlContext.sql("SELECT * FROM PERSON").show()
}
Conclusion:
We have seen examples of how to write Avro data files to Amazon S3 from DataFrame and how to read Avro from Amazon S3 into Spark DataFrame. Also, I’ve explained working with Avro partition and how it improves while reading Avro file. Using Partition we can achieve a significant performance on reading.