You are currently viewing Read and Write Parquet file from Amazon S3
spark read parquet
Spark read from & write to parquet file | Amazon S3 bucket

In this Spark tutorial, you will learn what is Apache Parquet, It’s advantages and how to read the Parquet file from Amazon S3 bucket into Dataframe and write DataFrame in Parquet file to Amazon S3 bucket with Scala example.

1. Apache Parquet Introduction

Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON, supported by many data processing systems.

It is compatible with most of the data processing frameworks in the Hadoop echo systems. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.

Spark SQL provides support for both reading and writing Parquet files that automatically capture the schema of the original data, It also reduces data storage by 75% on average. Below are some advantages of storing data in a parquet format. Spark by default supports Parquet in its library hence we don’t need to add any dependency libraries.

1.1 Apache Parquet Advantages:

Below are some of the advantages of using Apache Parquet. combining these benefits with Spark improves performance and gives the ability to work with structure files.

  • Reduces IO operations.
  • Fetches specific columns that you need to access.
  • It consumes less space.
  • Support type-specific encoding.

2. Amazon S3 bucket and dependency

In order to interact with Amazon S3 from Spark, we need to use the third party library. And this library has 3 different options.

GenerationUsageDescription
Firsts3:\\ s3 which is also called classic (s3: filesystem for reading from or storing objects in Amazon S3 This has been deprecated and recommends using either the second or third generation library.
Seconds3n:\\s3n uses native s3 object and makes easy to use it with Hadoop and other files systems. This is also not the recommended option.
Thirds3a:\\s3a – This is a replacement of s3n which supports larger files and improves in performance.

In this example, we will use the latest and greatest Third Generation which is s3a:\\ . Below are the Hadoop and AWS dependencies you would need in order Spark to read/write files into Amazon AWS S3 storage.


// Amazon S3 bucket and dependency   
<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>3.0.0</version>
        </dependency>

You can find more details about these dependencies and use the one which is suitable for you. Regardless of which one you use, the steps of how to read/write to Amazon S3 would be exactly the same except s3a:\\.


val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()
 // Replace Key with your AWS account key (You can find this on IAM 
spark.sparkContext
     .hadoopConfiguration.set("fs.s3a.access.key", "awsaccesskey value")
service)
 // Replace Key with your AWS secret key (You can find this on IAM 
spark.sparkContext
     .hadoopConfiguration.set("fs.s3a.secret.key", "aws secretkey value")
spark.sparkContext
      .hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")

In case if you are using s3n: file system


spark.sparkContext
     .hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "awsAccessKeyId value")
spark.sparkContext
     .hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "awsSecretAccessKey value")
spark.sparkContext
      .hadoopConfiguration.set("fs.s3n.endpoint", "s3.amazonaws.com")

3. Spark | Parquet | Amazon S3 – Example

Before we read from and write Apache parquet in Amazon S3 using Spark example, first, let’s Create a Spark DataFrame from Seq object. Note that toDF() function on sequence object is available only when you import implicits using spark.sqlContext.implicits._.


// Spark | Parquet | Amazon S3 - Example 
val data = Seq(("James ","","Smith","36636","M",3000),
      ("Michael ","Rose","","40288","M",4000),
      ("Robert ","","Williams","42114","M",4000),
      ("Maria ","Anne","Jones","39192","F",4000),
      ("Jen","Mary","Brown","","F",-1)
    )

val columns = Seq("firstname","middlename","lastname","dob","gender","salary")
import spark.sqlContext.implicits._
val df = data.toDF(columns:_*)

The above example creates a data frame with columns “firstname”, “middlename”, “lastname”, “dob”, “gender”, “salary”

3.1 Spark Write DataFrame in Parquet file to Amazon S3

Using spark.write.parquet() function we can write Spark DataFrame in Parquet file to Amazon S3.The parquet() function is provided in DataFrameWriter class. As mentioned earlier Spark doesn’t need any additional packages or libraries to use Parquet as it by default provides with Spark. easy isn’t it? as we don’t have to worry about version and compatibility issues. In this example, we are writing DataFrame to people.parquet file on S3 bucket.


// Spark Write DataFrame in Parquet file to Amazon S3
df.write.parquet("s3a://sparkbyexamples/parquet/people.parquet")

Writing Spark DataFrame to Parquet format preserves the column names and data types, and all columns are automatically converted to be nullable for compatibility reasons. Notice that all part files Spark creates has parquet extension.

Spark read Parquet from S3
Parquet file on Amazon S3

3.2 Spark Read Parquet file from Amazon S3 into DataFrame

Similar to write, DataFrameReader provides parquet() function (spark.read.parquet) to read the parquet files from the Amazon S3 bucket and creates a Spark DataFrame. In this example snippet, we are reading data from an apache parquet file we have written before.


// Spark Read Parquet file from Amazon S3 into DataFrame
val parqDF = spark.read.parquet("s3a://sparkbyexamples/parquet/people.parquet")

printing schema of DataFrame returns columns with the same names and data types.

3.3 Append to existing Parquet file on S3

Spark provides the capability to append DataFrame to existing parquet files using “append” save mode. In case, if you want to overwrite use “overwrite” save mode.


// Append to existing Parquet file on S3 
df.write.mode('append').parquet("s3a://sparkbyexamples/parquet/people.parquet")

3.4 Using SQL queries on Parquet

We can also create a temporary view on Parquet files and then use it in Spark SQL statements. This temporary table would be available until the SparkContext present.


// Using SQL queries on Parquet
parqDF.createOrReplaceTempView("ParquetTable")
val parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")

Above predicate on spark parquet file does the file scan which is performance bottleneck like table scan on a traditional database. We should use partitioning in order to improve performance.

4. Spark parquet partition – Improving performance

Partitioning is a feature of many databases and data processing frameworks and it is key to make jobs work at scale. We can do a parquet file partition using spark partitionBy function.


// Spark parquet partition - Improving performance
df.write.partitionBy("gender","salary")
        .parquet("s3a://sparkbyexamples/parquet/people2.parquet")

Parquet Partition creates a folder hierarchy for each spark partition; we have mentioned the first partition as gender followed by salary hence, it creates a salary folder inside the gender folder.

Spark write Parquet into S3
Parquet Partition file on Amazon S3

This is an example of how to write a Spark DataFrame by preserving the partitioning on gender and salary columns.


val parqDF = spark.read.parquet("s3a://sparkbyexamples/parquet/people2.parquet")
parqDF.createOrReplaceTempView("Table2")
val df = spark.sql("select * from Table2  where gender='M' and salary >= 4000")

The execution of this query is significantly faster than the query without partition. It filters the data first on gender and then applies filters on salary.

5. Spark Read a specific Parquet partition


// Spark Read a specific Parquet partition 
val parqDF = spark.read.parquet("s3a://sparkbyexamples/parquet/people2.parquet/gender=M")

This code snippet retrieves the data from the gender partition value “M”.

6. Spark read/write Parquet file from S3 Complete Example


// Spark read/write Parquet file from S3 Complete Example 
package com.sparkbyexamples.spark

import org.apache.spark.sql.SparkSession

object ParquetAWSExample extends App{
    
    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()
    spark.sparkContext
      .hadoopConfiguration.set("fs.s3a.access.key", "replace access key")
    spark.sparkContext
      .hadoopConfiguration.set("fs.s3a.secret.key", "replace secret key")
    spark.sparkContext
      .hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")

    val data = Seq(("James ","Rose","Smith","36636","M",3000),
      ("Michael","Rose","","40288","M",4000),
      ("Robert","Mary","Williams","42114","M",4000),
      ("Maria","Anne","Jones","39192","F",4000),
      ("Jen","Mary","Brown","1234","F",-1)
    )

    val columns = Seq("firstname","middlename","lastname","dob","gender","salary")
    import spark.sqlContext.implicits._
    val df = data.toDF(columns:_*)

    df.show()
    df.printSchema()

    df.write
      .parquet("s3a://sparkbyexamples/parquet/people.parquet")


    val parqDF = spark.read.parquet("s3a://sparkbyexamples/parquet/people.parquet")
    parqDF.createOrReplaceTempView("ParquetTable")

    spark.sql("select * from ParquetTable where salary >= 4000").explain()
    val parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")

    parkSQL.show()
    parkSQL.printSchema()

    df.write
      .partitionBy("gender","salary")
      .parquet("s3a://sparkbyexamples/parquet/people2.parquet")
}

This complete example is also available at GitHub for reference

Conclusion:

You have learned how to read a write an apache parquet data files from/to Amazon S3 bucket using Spark and also learned how to improve the performance by using partition and filtering data with a partition key and finally appending to and overwriting existing parquet files in S3 bucket. Though I’ve not covered here with PySpark example, you might have got some idea how to use Parquet and it’s advantages.

References

Happy Learning !! 🙂

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

This Post Has 2 Comments

  1. NNK

    Unfortunately, you can’t as I’ve protected my account. Please create your AWS account and try with the credentials.

  2. Evan

    Hi, thanks for the guide. Is there any way that I can read data from a public s3 bucket without submitting credentials? I can’t access it without or with a random credential. Thanks.

Comments are closed.