You are currently viewing Write & Read CSV file from S3 into DataFrame
Photo by Lewis Ngugi on Unsplash

Spark SQL provides spark.read.csv("path") to read a CSV file from Amazon S3, local file system, hdfs, and many other data sources into Spark DataFrame and dataframe.write.csv("path") to save or write DataFrame in CSV format to Amazon S3, local file system, HDFS, and many other data sources.

In this tutorial you will learn how to read a single file, multiple files, all files from an Amazon AWS S3 bucket into DataFrame and applying some transformations finally writing DataFrame back to S3 in CSV format by using Scala & Python (PySpark) example.

Note: Spark out of the box supports to read files in CSV, JSON, and many more file formats into Spark DataFrame. 

Table of contents:

An example explained in this tutorial uses the CSV file from following GitHub location.

Amazon S3 bucket and dependency

In order to interact with Amazon S3 from Spark, we need to use the third party library. And this library has 3 different options.

GenerationUsageDescription
First – s3s3:\\s3 which is also called classic (s3: filesystem for reading from or storing objects in Amazon S3 This has been deprecated and recommends using either the second or third generation library.
Second – s3ns3n:\\s3n uses native s3 object and makes easy to use it with Hadoop and other files systems. This is also not the recommended option.
Third – s3as3a:\\s3a – This is a replacement of s3n which supports larger files and improves in performance.

In this example, we will use the latest and greatest Third Generation which is <strong>s3a:\\</strong> . Below are the Hadoop and AWS dependencies you would need in order Spark to read/write files into Amazon AWS S3 storage.

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>3.0.0</version>
        </dependency>

You can find more details about these dependencies and use the one which is suitable for you. Regardless of which one you use, the steps of how to read/write to Amazon S3 would be exactly the same except s3a:\\.


val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()
 // Replace Key with your AWS account key (You can find this on IAM 
spark.sparkContext
     .hadoopConfiguration.set("fs.s3a.access.key", "awsaccesskey value")
service)
 // Replace Key with your AWS secret key (You can find this on IAM 
spark.sparkContext
     .hadoopConfiguration.set("fs.s3a.secret.key", "aws secretkey value")
spark.sparkContext
      .hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")

In case if you are using s3n: file system


spark.sparkContext
     .hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "awsAccessKeyId value")
spark.sparkContext
     .hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "awsSecretAccessKey value")
spark.sparkContext
      .hadoopConfiguration.set("fs.s3n.endpoint", "s3.amazonaws.com")

Spark Read CSV file from S3 into DataFrame

Using spark.read.csv("path") or spark.read.format("csv").load("path") you can read a CSV file from Amazon S3 into a Spark DataFrame, Thes method takes a file path to read as an argument. By default read method considers header as a data record hence it reads column names on file as data, To overcome this we need to explicitly mention “true” for header option. It also reads all columns as a string (StringType) by default. 

I will explain in later sections on how to inferschema the schema of the CSV which reads the column names from header and column type from data.

 
val df = spark.read.csv("s3a://sparkbyexamples/csv/zipcodes.csv")
    df.show(false)
    df.printSchema()

When you use format(“csv”) method, you can also specify the Data sources by their fully qualified name (i.e., org.apache.spark.sql.csv), but for built-in sources, you can also use their short names (csv,jsonparquetjdbctext e.t.c). 

Example in Python (PySpark)

Here is a similar example in python (PySpark) using format and load methods

 
spark.read.format('csv').options(header='true', inferSchema='true')
    .load('s3a://sparkbyexamples/csv/zipcodes.csv')

This example reads the data into DataFrame columns “_c0” for the first column and “_c1” for second and so on. and by default type of all these columns would be String.

Read multiple CSV files

Using the spark.read.csv() method you can also read multiple csv files, just pass all qualifying amazon s3 file names by separating comma as a path, for example : 

 
val df = spark.read.csv("s3 path1,s3 path2,s3 path3")

Read all CSV files in a directory

 We can read all CSV files from a directory into DataFrame just by passing directory as a path to the csv() method.

 
val df = spark.read.csv("Folder path")

Reading CSV files with a user-specified custom schema

If you know the schema of the file ahead and do not want to use the inferSchema option for column names and types, use user-defined custom column names and type using schema option.

 
    val schema = new StructType()
      .add("RecordNumber",IntegerType,true)
      .add("Zipcode",IntegerType,true)
      .add("ZipCodeType",StringType,true)
      .add("City",StringType,true)
      .add("State",StringType,true)
      .add("LocationType",StringType,true)
      .add("Lat",DoubleType,true)
      .add("Long",DoubleType,true)
      .add("Xaxis",IntegerType,true)
      .add("Yaxis",DoubleType,true)
      .add("Zaxis",DoubleType,true)
      .add("WorldRegion",StringType,true)
      .add("Country",StringType,true)
      .add("LocationText",StringType,true)
      .add("Location",StringType,true)
      .add("Decommisioned",BooleanType,true)
      .add("TaxReturnsFiled",StringType,true)
      .add("EstimatedPopulation",IntegerType,true)
      .add("TotalWages",IntegerType,true)
      .add("Notes",StringType,true)
    val df_with_schema = spark.read.format("csv")
      .option("header", "true")
      .schema(schema)
      .load("s3a://sparkbyexamples/csv/zipcodes.csv")
    df_with_schema.printSchema()
    df_with_schema.show(false)

Write Spark DataFrame to S3 in CSV file format

Use the write() method of the Spark DataFrameWriter object to write Spark DataFrame to an Amazon S3 bucket in CSV file format.

 
 df2.write
 .option("header","true")
 .csv("s3a://sparkbyexamples/csv/zipcodes")

Options

While writing a CSV file you can use several options. for example, whether you want to output the column names as header using option header and what should be your delimiter on CSV file using option delimiter and many more. 

 
 df2.write.options("header","true")
 .csv("s3a://sparkbyexamples/csv/zipcodes")
 

Other options available quote,escape,nullValue,dateFormat,quoteMode

Saving modes

Spark DataFrameWriter also has a method mode() to specify SaveMode; the argument to this method either takes below string or a constant from SaveMode class.

overwrite – mode is used to overwrite the existing file, alternatively, you can use SaveMode.Overwrite.

append – To add the data to the existing file, alternatively, you can use SaveMode.Append.

ignore – Ignores write operation when the file already exists, alternatively you can use SaveMode.Ignore.

errorifexists or error – This is a default option when the file already exists, it returns an error, alternatively, you can use SaveMode.ErrorIfExists.

 
df2.write.mode(SaveMode.Overwrite).csv("s3a://sparkbyexamples/csv/zipcodes")

Conclusion:

In this tutorial, you have learned how to read a CSV file, multiple csv files and all files in an Amazon S3 bucket into Spark DataFrame, using multiple options to change the default behavior and writing CSV files back to Amazon S3 using different save options.

References:

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium