You are currently viewing Spark – Read & Write Avro files from Amazon S3

In this Spark tutorial, you will learn what is Avro format, It’s advantages and how to read the Avro file from Amazon S3 bucket into Dataframe and write DataFrame in Avro file to Amazon S3 bucket with Scala example.

Spark provides built-in support to read from and write DataFrame to Avro file using “spark-avro” library however, to write Avro file to Amazon S3 you need s3 library.

If you are using Spark 2.3 or older then please use this URL.

Table of the contents:

Related: Spark from_avro() and to_avro() usage

1. What is Apache Avro?

Apache Avro is an open-source, row-based, data serialization and data exchange framework for Hadoop projects, originally developed by databricks as an open-source library that supports reading and writing data in Avro file format. it is mostly used in Apache Spark especially for Kafka-based data pipelines. When Avro data is stored in a file, its schema is stored with it, so that files may be processed later by any program.

It has build to serialize and exchange big data between different Hadoop based projects. It serializes data in a compact binary format and schema is in JSON format that defines the field names and data types.

It is similar to Thrift and Protocol Buffers, but does not require the code generation as it’s data always accompanied by a schema that permits full processing of that data without code generation. This is one of the great advantages compared with other serialization systems.

2. Apache Avro Advantages

  • Supports complex data structures like Arrays, Map, Array of map and map of array elements.
  • A compact, binary serialization format which provides fast while transferring data.
  • row-based data serialization system.
  • Support multi-languages, meaning data written by one language can be read by different languages.
  • Code generation is not required to read or write data files.
  • Simple integration with dynamic languages.

3. Spark Avro dependencies

Since Spark 2.4, Spark SQL provides built-in support for reading and writing Apache Avro data files, however, the spark-avro module is external and by default, it’s not included in spark-submit or spark-shell hence, accessing Avro file format in Spark is enabled by providing a package.

maven dependencies.

spark-submit

While using spark-submit, provide spark-avro_2.12 and its dependencies directly using --packages, such as,


./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:2.4.4

spark-shell

While working with  spark-shell, you can also use --packages to add spark-avro_2.12 and its dependencies directly,


./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:2.4.4

4. Amazon S3 bucket and dependency

In order to interact with Amazon S3 from Spark, we need to use the third party library. And this library has 3 different options.

GenerationUsageDescription
Firsts3:\\ s3 which is also called classic (s3: filesystem for reading from or storing objects in Amazon S3 This has been deprecated and recommends to use either the second or third generation library.
Seconds3n:\\s3n uses native s3 object and makes easy to use it with Hadoop and other files systems.
Thirds3a:\\s3a – This is a replacement of s3n which supports larger files and improves in performance.

In this example, we will use the latest version, which is option 3 using <strong>s3a:\\</strong> . Below are the Hadoop and AWS dependencies you would need in order Spark to read/write files into Amazon AWS S3 storage


<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-common</artifactId>
   <version>3.0.0</version>
</dependency>

<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>3.0.0</version>
</dependency>
<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-aws</artifactId>
   <version>3.0.0</version>
</dependency>

You can find more details about these dependencies by referring URL from the reference section and use the one which is suitable for you. Regardless of which one you use, the steps of how to read/write to Amazon S3 would be exactly the same except s3a:\\.


val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()
spark.sparkContext
     .hadoopConfiguration.set("fs.s3a.access.key", "awsaccesskey value")
spark.sparkContext
     .hadoopConfiguration.set("fs.s3a.secret.key", "aws secretkey value")

5. Write Spark DataFrame in Avro Data File to S3

Let’s see now how to write an Avro file to Amazon S3 bucket. Since Avro library is external to Spark, it doesn’t provide avro() function on DataFrameWriter , hence we should use DataSource “avro” or “org.apache.spark.sql.avro” to write Spark DataFrame to Avro file.


df.write.format("avro").save("s3a:\\sparkbyexamples\person.avro")

6. Read Avro Data File from S3 into Spark DataFrame

Now, let’s read an Avro file from Amazon AWS S3 bucket into Spark DataFrame. As mentioned earlier avro() function is not provided in Spark DataFrameReader  hence, we should use DataSource format as “avro” or “org.apache.spark.sql.avro” and load() is used to read the Avro file.


val personDF= spark.read.format("avro").load("s3a:\\sparkbyexamples\person.avro")

7. Writing Avro Partition Data into S3

Spark DataFrameWriter provides partitionBy() function to partition the Avro at the time of writing. Partition improves performance on reading by reducing Disk I/O.


val data = Seq(("James ","","Smith",2018,1,"M",3000),
      ("Michael ","Rose","",2010,3,"M",4000),
      ("Robert ","","Williams",2010,3,"M",4000),
      ("Maria ","Anne","Jones",2005,5,"F",4000),
      ("Jen","Mary","Brown",2010,7,"",-1)
    )

val columns = Seq("firstname", "middlename", "lastname", "dob_year",
 "dob_month", "gender", "salary")
import spark.sqlContext.implicits._
val df = data.toDF(columns:_*)

df.write.partitionBy("dob_year","dob_month")
        .format("avro").save("person_partition.avro")

This example creates partition by “date of birth year and month” on person data. Avro creates a folder for each partition data and stores that specific partition data in this folder.

8. Reading Avro Partition Data from S3

When we try to retrieve the data from partition, It just reads the data from the partition folder without scanning entire Avro files.


spark.read
      .format("avro")
      .load("s3a://sparkbyexamples/person_partition.avro")
      .where(col("dob_year") === 2010)
      .show()

9. Using Avro Schema

Avro schemas are usually defined with .avsc extension and the format of the file is in JSON. Will store below schema in person.avsc file and provide this file using option() while reading an Avro file. This schema provides the structure of the Avro file with field names and it’s data types.


{
  "type": "record",
  "name": "Person",
  "namespace": "com.sparkbyexamples",
  "fields": [
    {"name": "firstname","type": "string"},
    {"name": "middlename","type": "string"},
    {"name": "lastname","type": "string"},
    {"name": "dob_year","type": "int"},
    {"name": "dob_month","type": "int"},
    {"name": "gender","type": "string"},
    {"name": "salary","type": "int"}
  ]
}

You can download Avro schema example from GitHub


val schemaAvro = new Schema.Parser()
      .parse(new File("src/main/resources/person.avsc"))

val df = spark.read
              .format("avro")
              .option("avroSchema", schemaAvro.toString)
              .load("s3a:\\sparkbyexamples\\person.avro")

Alternatively, we can also specify the StructType using the schema method.

10. Using Avro with Spark SQL

We can also read Avro data files using SQL, to do this, first, create a temporary table by pointing to the Avro data file and run the SQL command on the table.


spark.sqlContext.sql("CREATE TEMPORARY VIEW PERSON USING avro 
OPTIONS (path \"s3a:\\sparkbyexamples\\person.avro\")")
spark.sqlContext.sql("SELECT * FROM PERSON").show()

11. Spark read from/write Avro into S3 Example


package com.sparkbyexamples.spark

import java.io.File
import org.apache.avro.Schema
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

object AvroAWSExample extends App{

  val spark: SparkSession = SparkSession.builder().master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext
    .hadoopConfiguration.set("fs.s3a.access.key", "replace access key")
  spark.sparkContext
    .hadoopConfiguration.set("fs.s3a.secret.key", "replace secret key")
  spark.sparkContext
    .hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")

  val data = Seq(("James ", "", "Smith", 2018, 1, "M", 3000),
    ("Michael ", "Rose", "", 2010, 3, "M", 4000),
    ("Robert ", "", "Williams", 2010, 3, "M", 4000),
    ("Maria ", "Anne", "Jones", 2005, 5, "F", 4000),
    ("Jen", "Mary", "Brown", 2010, 7, "", -1)
  )

  val columns = Seq("firstname", "middlename", "lastname", "dob_year",
    "dob_month", "gender", "salary")
  import spark.sqlContext.implicits._
  val df = data.toDF(columns: _*)

  /**
    * Write Avro File
    */
  df.write.format("avro")
    .mode(SaveMode.Overwrite)
    .save("s3a://sparkbyexamples/avro/person.avro")

  /**
    * Read Avro File
    */
  spark.read.format("avro").load("s3a://sparkbyexamples/avro/person.avro").show()

  /**
    * Write Avro Partition
    */
  df.write.partitionBy("dob_year","dob_month")
    .format("avro")
    .mode(SaveMode.Overwrite)
    .save("s3a://sparkbyexamples/avro/person_partition.avro")

  /**
    * Reading Avro Partition
    */
  spark.read
    .format("avro")
    .load("s3a://sparkbyexamples/avro/person_partition.avro")
    .where(col("dob_year") === 2010)
    .show()

  /**
    * Explicit Avro schema
    */
  val schemaAvro = new Schema.Parser()
    .parse(new File("src/main/resources/person.avsc"))

  spark.read
    .format("avro")
    .option("avroSchema", schemaAvro.toString)
    .load("s3a://sparkbyexamples/avro/person.avro")
    .show()

  /**
    * Avro Spark SQL
    */
  spark.sqlContext.sql("CREATE TEMPORARY VIEW PERSON USING avro OPTIONS (path \"s3a://sparkbyexamples/avro/person.avro\")")
  spark.sqlContext.sql("SELECT * FROM PERSON").show()
}

Conclusion:

We have seen examples of how to write Avro data files to Amazon S3 from DataFrame and how to read Avro from Amazon S3 into Spark DataFrame. Also, I’ve explained working with Avro partition and how it improves while reading Avro file. Using Partition we can achieve a significant performance on reading.

References:

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium