Using Spark SQL spark.read.json("path")
you can read a JSON file from Amazon S3 bucket, HDFS, Local file system, and many other file systems supported by Spark. Similarly using write.json("path")
method of DataFrame you can save or write DataFrame in JSON format to Amazon S3 bucket.
In this tutorial, you will learn how to read a JSON (single or multiple) file from an Amazon AWS S3 bucket into DataFrame and write DataFrame back to S3 by using Scala examples.
Note: Spark out of the box supports to read files in CSV, JSON, AVRO, PARQUET, TEXT, and many more file formats.
Table of contents:
- Amazon S3 dependencies
- Spark Read JSON file from Amazon S3 into DataFrame
- Read JSON file from multiline
- Reading multiple files at a time
- Reading file with a user-specified schema
- Reading file from Amazon S3 using Spark SQL
- Options while reading JSON file
- Spark Write JSON file to Amazon S3 bucket
Amazon S3 bucket and dependency
In order to interact with Amazon S3 from Spark, we need to use the third-party library hadoop-aws
and this library supports 3 different generations.
Generation | Usage | Description |
First – s3 | s3:\\ | s3 which is also called classic (s3: filesystem for reading from or storing objects in Amazon S3 This has been deprecated and recommends using either the second or third generation library. |
Second – s3n | s3n:\\ | s3n uses native s3 object and makes easy to use it with Hadoop and other files systems. This is also not the recommended option. |
Third – s3a | s3a:\\ | s3a – This is a replacement of s3n which supports larger files and improves in performance. |
In this tutorial, I will use the Third Generation which is s3a:\\ . Below are the Hadoop and AWS dependencies you would need in order for Spark to read/write files into Amazon AWS S3 storage.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.0.0</version>
</dependency>
You can find the latest version of hadoop-aws library at Maven repository.
Before you proceed with the rest of the article, please have an AWS account, S3 bucket, and AWS access key, and secret key. You can find access and secret key values on your AWS IAM service.
Once you have the details, let’s create a SparkSession and set AWS keys to SparkContext.
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
// Replace Key with your AWS account accesskey (You can find this on IAM
spark.sparkContext
.hadoopConfiguration
.set("fs.s3a.access.key", "aws accesskey value")
service)
// Replace Key with your AWS secret key (You can find this on IAM
spark.sparkContext
.hadoopConfiguration
.set("fs.s3a.secret.key", "aws secretkey value")
spark.sparkContext
.hadoopConfiguration
.set("fs.s3a.endpoint", "s3.amazonaws.com")
In case if you are using second generation s3n:
file system, use below code with the same above maven dependencies.
spark.sparkContext
.hadoopConfiguration
.set("fs.s3n.awsAccessKeyId", "awsAccessKeyId value")
spark.sparkContext
.hadoopConfiguration
.set("fs.s3n.awsSecretAccessKey", "awsSecretAccessKey value")
spark.sparkContext
.hadoopConfiguration
.set("fs.s3n.endpoint", "s3.amazonaws.com")
Spark Read JSON file from Amazon S3
To read JSON file from Amazon S3 and create a DataFrame, you can use either spark.read.json("path")
or spark.read.format("json").load("path")
, these take a file path to read from as an argument. Download the simple_zipcodes.json.json file to practice.
Note: These methods are generic methods hence they are also be used to read JSON files from HDFS, Local, and other file systems that Spark supports.
//read json file into dataframe
val df = spark.read.json("s3a://sparkbyexamples/json/simple_zipcodes.json")
df.printSchema()
df.show(false)
When you use spark.format("json")
method, you can also specify the Data sources by their fully qualified name (i.e., org.apache.spark.sql.json
). For built-in sources, you can also use the short name json
.
Unlike reading a CSV, by default Spark infer-schema from a JSON file.
Read JSON file from multiline
Sometimes you may want to read records from JSON file that scattered multiple lines, In order to read such files, use-value true to multiline option, by default multiline option, is set to false.
Below is the input file we going to read, this same file is also available at Github.
[{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
},
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}]
Using spark.read.option("multiline","true")
//read multiline json file
val multiline_df = spark.read.option("multiline","true")
.json("s3a://sparkbyexamples/json/multiline-zipcode.json")
multiline_df.show(false)
Reading multiple files at a time
Using the spark.read.json()
method you can also read multiple JSON files from different paths, just pass all file names with fully qualified paths by separating comma, for example
//read multiple files
val df2 = spark.read.json(
"s3a://sparkbyexamples/json/zipcode1.json",
"s3a://sparkbyexamples/json/zipcode2.json")
df2.show(false)
Reading files with a user-specified custom schema
Spark Schema defines the structure of the data, in other words, it is the structure of the DataFrame. Spark SQL provides StructType & StructField classes to programmatically specify the structure to the DataFrame.
If you know the schema of the file ahead and do not want to use the default inferSchema
option for column names and types, use user-defined custom column names and type using schema option.
Use the StructType class to create a custom schema, below we initiate this class and use add a method to add columns to it by providing the column name, data type and nullable option.
//Define custom schema
val schema = new StructType()
.add("Zipcode",IntegerType,true)
.add("ZipCodeType",StringType,true)
.add("City",StringType,true)
.add("State",StringType,true)
val df_with_schema = spark.read.schema(schema)
.json("s3a://sparkbyexamples/json/simple_zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show(false)
Read JSON file from S3 using Spark SQL
Spark SQL also provides a way to read a JSON file by creating a temporary view directly from reading file using spark.sqlContext.sql(“load json to temporary view”)
spark.sqlContext.sql("CREATE TEMPORARY VIEW zipcode USING json OPTIONS" +
" (path 's3a://sparkbyexamples/json/simple_zipcodes.json')")
spark.sqlContext.sql("select * fro zipcodes").show(false)
Options while reading JSON file
nullValues
Using nullValues
option you can specify the string in a JSON to consider as null. For example, if you want to consider a date column with a value “1900-01-01” set null on DataFrame.
dateFormat
dateFormat
option to used to set the format of the input DateType and TimestampType columns. It supports all java.text.SimpleDateFormat formats.
Note: Besides the above options, the Spark JSON dataset also supports many other options, please refer to Spark documentation for the latest documents.
Write Spark DataFrame to JSON file on Amazon S3 Bucket
Use the Spark DataFrameWriter
object write()
method on DataFrame to write a JSON file to Amazon S3 bucket.
df2.write
.json("s3a://sparkbyexamples/json/ziprecords-new.json")
Spark Options while writing JSON files
While writing a JSON file you can use several options.
Other options available nullValue
, dateFormat
e.t.c
Append or Overwrite Files on Amazon S3 Bucket using Saving modes
Spark DataFrameWriter also has a method mode() to specify SaveMode; the argument to this method either takes the below string or a constant from SaveMode
class. You can use these to append, overwrite files on the Amazon S3 bucket.
overwrite – mode is used to overwrite the existing file, alternatively, you can use SaveMode.Overwrite
.
append – To add the data to the existing file, alternatively, you can use SaveMode.Append
.
ignore – Ignores write operation when the file already exists, alternatively you can use SaveMode.Ignore
.
errorifexists or error – This is a default option when the file already exists, it returns an error, alternatively, you can use SaveMode.ErrorIfExists
.
df2.write.mode(SaveMode.Overwrite).json("s3a://sparkbyexamples/json/ziprecords-new.json")
Conclusion:
In this tutorial, you have learned Amazon S3 dependencies that are used to read and write JSON from to and from the S3 bucket. Also learned how to read a JSON file with single line record and multiline record into Spark DataFrame.
Related Articles
- Spark Read JSON from a CSV file
- Spark Read JSON from multiline
- Spark Read Text File | RDD | DataFrame
- Spark Read Text File from AWS S3 bucket
- Spark Read Files from HDFS (TXT, CSV, AVRO, PARQUET, JSON)
- Spark Read ORC file into DataFrame
- Spark Read multiline (multiple line) CSV File
- Spark Read Multiple CSV Files
References:
Happy Learning !!