You are currently viewing Spark Read Json From Amazon S3

Using Spark SQL spark.read.json("path") you can read a JSON file from Amazon S3 bucket, HDFS, Local file system, and many other file systems supported by Spark. Similarly using write.json("path") method of DataFrame you can save or write DataFrame in JSON format to Amazon S3 bucket.

Advertisements

In this tutorial, you will learn how to read a JSON (single or multiple) file from an Amazon AWS S3 bucket into DataFrame and write DataFrame back to S3 by using Scala examples.

Note: Spark out of the box supports to read files in CSV, JSON, AVRO, PARQUET, TEXT, and many more file formats. 

Table of contents:

Amazon S3 bucket and dependency

In order to interact with Amazon S3 from Spark, we need to use the third-party library hadoop-aws and this library supports 3 different generations.

GenerationUsageDescription
First – s3s3:\\s3 which is also called classic (s3: filesystem for reading from or storing objects in Amazon S3 This has been deprecated and recommends using either the second or third generation library.
Second – s3ns3n:\\s3n uses native s3 object and makes easy to use it with Hadoop and other files systems. This is also not the recommended option.
Third – s3as3a:\\s3a – This is a replacement of s3n which supports larger files and improves in performance.

In this tutorial, I will use the Third Generation which is s3a:\\ . Below are the Hadoop and AWS dependencies you would need in order for Spark to read/write files into Amazon AWS S3 storage.


<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-common</artifactId>
	<version>3.0.0</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-client</artifactId>
	<version>3.0.0</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-aws</artifactId>
	<version>3.0.0</version>
</dependency>

You can find the latest version of hadoop-aws library at Maven repository.

Before you proceed with the rest of the article, please have an AWS account, S3 bucket, and AWS access key, and secret key. You can find access and secret key values on your AWS IAM service.

Once you have the details, let’s create a SparkSession and set AWS keys to SparkContext.


val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()
// Replace Key with your AWS account accesskey (You can find this on IAM 
spark.sparkContext
     .hadoopConfiguration
     .set("fs.s3a.access.key", "aws accesskey value")
service)
// Replace Key with your AWS secret key (You can find this on IAM 
spark.sparkContext
     .hadoopConfiguration
     .set("fs.s3a.secret.key", "aws secretkey value")
spark.sparkContext
     .hadoopConfiguration
     .set("fs.s3a.endpoint", "s3.amazonaws.com")

In case if you are using second generation s3n: file system, use below code with the same above maven dependencies.


spark.sparkContext
     .hadoopConfiguration
     .set("fs.s3n.awsAccessKeyId", "awsAccessKeyId value")
spark.sparkContext
     .hadoopConfiguration
     .set("fs.s3n.awsSecretAccessKey", "awsSecretAccessKey value")
spark.sparkContext
     .hadoopConfiguration
     .set("fs.s3n.endpoint", "s3.amazonaws.com")

Spark Read JSON file from Amazon S3

To read JSON file from Amazon S3 and create a DataFrame, you can use either spark.read.json("path") or spark.read.format("json").load("path") , these take a file path to read from as an argument. Download the simple_zipcodes.json.json file to practice.

Note: These methods are generic methods hence they are also be used to read JSON files from HDFS, Local, and other file systems that Spark supports.


//read json file into dataframe
val df = spark.read.json("s3a://sparkbyexamples/json/simple_zipcodes.json")
df.printSchema()
df.show(false)

When you use spark.format("json") method, you can also specify the Data sources by their fully qualified name (i.e., org.apache.spark.sql.json). For built-in sources, you can also use the short name json

Unlike reading a CSV, by default Spark infer-schema from a JSON file.

Read JSON file from multiline

Sometimes you may want to read records from JSON file that scattered multiple lines, In order to read such files, use-value true to multiline option, by default multiline option, is set to false.

Below is the input file we going to read, this same file is also available at Github


[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]

Using spark.read.option("multiline","true")


    //read multiline json file
    val multiline_df = spark.read.option("multiline","true")
      .json("s3a://sparkbyexamples/json/multiline-zipcode.json")
    multiline_df.show(false)    

Reading multiple files at a time

Using the spark.read.json() method you can also read multiple JSON files from different paths, just pass all file names with fully qualified paths by separating comma, for example


    //read multiple files
    val df2 = spark.read.json(
      "s3a://sparkbyexamples/json/zipcode1.json",
      "s3a://sparkbyexamples/json/zipcode2.json")
    df2.show(false)

Reading files with a user-specified custom schema

Spark Schema defines the structure of the data, in other words, it is the structure of the DataFrame. Spark SQL provides StructType & StructField classes to programmatically specify the structure to the DataFrame.

If you know the schema of the file ahead and do not want to use the default inferSchema option for column names and types, use user-defined custom column names and type using schema option.

Use the StructType class to create a custom schema, below we initiate this class and use add a method to add columns to it by providing the column name, data type and nullable option.


    //Define custom schema
    val schema = new StructType()
      .add("Zipcode",IntegerType,true)
      .add("ZipCodeType",StringType,true)
      .add("City",StringType,true)
      .add("State",StringType,true)
    val df_with_schema = spark.read.schema(schema)
        .json("s3a://sparkbyexamples/json/simple_zipcodes.json")
    df_with_schema.printSchema()
    df_with_schema.show(false)

Read JSON file from S3 using Spark SQL

Spark SQL also provides a way to read a JSON file by creating a temporary view directly from reading file using spark.sqlContext.sql(“load json to temporary view”)


spark.sqlContext.sql("CREATE TEMPORARY VIEW zipcode USING json OPTIONS" + 
      " (path 's3a://sparkbyexamples/json/simple_zipcodes.json')")
spark.sqlContext.sql("select * fro zipcodes").show(false)

Options while reading JSON file

nullValues

Using nullValues option you can specify the string in a JSON to consider as null. For example, if you want to consider a date column with a value “1900-01-01” set null on DataFrame.

dateFormat

dateFormat option to used to set the format of the input DateType and TimestampType columns. It supports all java.text.SimpleDateFormat formats.

Note: Besides the above options, the Spark JSON dataset also supports many other options, please refer to Spark documentation for the latest documents.

Write Spark DataFrame to JSON file on Amazon S3 Bucket

Use the Spark DataFrameWriter object write() method on DataFrame to write a JSON file to Amazon S3 bucket. 


 df2.write
 .json("s3a://sparkbyexamples/json/ziprecords-new.json")

Spark Options while writing JSON files

While writing a JSON file you can use several options.  

Other options available nullValue, dateFormat e.t.c

Append or Overwrite Files on Amazon S3 Bucket using Saving modes

Spark DataFrameWriter also has a method mode() to specify SaveMode; the argument to this method either takes the below string or a constant from SaveMode class. You can use these to append, overwrite files on the Amazon S3 bucket.

overwrite – mode is used to overwrite the existing file, alternatively, you can use SaveMode.Overwrite.

append – To add the data to the existing file, alternatively, you can use SaveMode.Append.

ignore – Ignores write operation when the file already exists, alternatively you can use SaveMode.Ignore.

errorifexists or error – This is a default option when the file already exists, it returns an error, alternatively, you can use SaveMode.ErrorIfExists.


df2.write.mode(SaveMode.Overwrite).json("s3a://sparkbyexamples/json/ziprecords-new.json")

Conclusion:

In this tutorial, you have learned Amazon S3 dependencies that are used to read and write JSON from to and from the S3 bucket. Also learned how to read a JSON file with single line record and multiline record into Spark DataFrame.

References:

Happy Learning !!

Leave a Reply