Spark Read and Write JSON file into DataFrame

Spark Read JSON file
Working with JSON files in Spark

Spark SQL provides spark.read.json("path") to read a single line and multiline (multiple lines) JSON file into Spark DataFrame and dataframe.write.json("path") to save or write to JSON file, In this tutorial, you will learn how to read a single file, multiple files, all files from a directory into DataFrame and writing DataFrame back to JSON file using Scala example.

The same approach could be used with Java and Python (PySpark) when time permits I will explain these additional languages.

Note: Spark out of the box supports to read JSON files and many more file formats into Spark DataFrame and spark uses Jackson library natively to work with JSON files.

The complete example explained here is available at GitHub project to download.

Table of contents:

Spark Read JSON file into DataFrame

Using spark.read.json("path") or spark.read.format("json").load("path") you can read a JSON file into a Spark DataFrame, these methods take a file path as an argument. 

Unlike reading a CSV, By default JSON data source inferschema from an input file.


//read json file into dataframe
val df = spark.read.json("src/main/resources/zipcodes.json")
df.printSchema()
df.show(false)

When you use format("json") method, you can also specify the Data sources by their fully qualified name (i.e., org.apache.spark.sql.json), for built-in sources, you can also use short name “json”. 

Example in Python (PySpark)

Here is a an example in python (PySpark) using format with short name and load methods


spark.read.format('json')
    .load('zipcodes.json')

Read JSON file from multiline

Sometimes you may want to read records from JSON file that scattered multiple lines, In order to read such files, use-value true to multiline option, by default multiline option, is set to false.

Below is the input file we going to read, this same file is also available at Github


[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]

Using spark.read.option("multiline","true")


    //read multiline json file
    val multiline_df = spark.read.option("multiline","true")
      .json("src/main/resources/multiline-zipcode.json")
    multiline_df.show(false)    

Reading multiple files at a time

Using the spark.read.json() method you can also read multiple JSON files from different paths, just pass all file names with fully qualified paths by separating comma, for example


    //read multiple files
    val df2 = spark.read.json(
      "src/main/resources/zipcodes_streaming/zipcode1.json",
      "src/main/resources/zipcodes_streaming/zipcode2.json")
    df2.show(false)

Reading all files in a directory

We can read all JSON files from a directory into DataFrame just by passing directory as a path to the json() method. Below snippet, “zipcodes_streaming” is a folder that contains multiple JSON files.


    //read all files from a folder
    val df3 = spark.read.json("src/main/resources/zipcodes_streaming")
    df3.show(false)

Reading files with a user-specified custom schema

Spark Schema defines the structure of the data, in other words, it is the structure of the DataFrame. Spark SQL provides StructType & StructField classes to programmatically specify the structure to the DataFrame.

If you know the schema of the file ahead and do not want to use the default inferSchema option for column names and types, use user-defined custom column names and type using schema option.

Use the StructType class to create a custom schema, below we initiate this class and use add a method to add columns to it by providing the column name, data type and nullable option.


    //Define custom schema
    val schema = new StructType()
      .add("RecordNumber",IntegerType,true)
      .add("Zipcode",IntegerType,true)
      .add("ZipCodeType",StringType,true)
      .add("City",StringType,true)
      .add("State",StringType,true)
      .add("LocationType",StringType,true)
      .add("Lat",DoubleType,true)
      .add("Long",DoubleType,true)
      .add("Xaxis",IntegerType,true)
      .add("Yaxis",DoubleType,true)
      .add("Zaxis",DoubleType,true)
      .add("WorldRegion",StringType,true)
      .add("Country",StringType,true)
      .add("LocationText",StringType,true)
      .add("Location",StringType,true)
      .add("Decommisioned",BooleanType,true)
      .add("TaxReturnsFiled",StringType,true)
      .add("EstimatedPopulation",IntegerType,true)
      .add("TotalWages",IntegerType,true)
      .add("Notes",StringType,true)
    val df_with_schema = spark.read.schema(schema)
        .json("src/main/resources/zipcodes.json")
    df_with_schema.printSchema()
    df_with_schema.show(false)

Read JSON file using Spark SQL

Spark SQL also provides a way to read a JSON file by creating a temporary view directly from reading file using spark.sqlContext.sql(“load json to temporary view”)


spark.sqlContext.sql("CREATE TEMPORARY VIEW zipcode USING json OPTIONS" + 
      " (path 'src/main/resources/zipcodes.json')")
spark.sqlContext.sql("select * fro zipcodes").show(false)

Options while reading JSON file

nullValues

Using nullValues option you can specify the string in a JSON to consider as null. For example, if you want to consider a date column with a value “1900-01-01” set null on DataFrame.

dateFormat

dateFormat option to used to set the format of the input DateType and TimestampType columns. Supports all java.text.SimpleDateFormat formats.

Note: Besides the above options, Spark JSON dataset also supports many other options.

Applying DataFrame transformations

Once you have created DataFrame from the JSON file, you can apply all transformation and actions DataFrame support. Please refer to the link for more details. 

Write Spark DataFrame to JSON file

Use the Spark DataFrameWriter object “write” method on DataFrame to write a JSON file. 


 df2.write
 .json("/tmp/spark_output/zipcodes.json")

Spark Options while writing JSON files

While writing a JSON file you can use several options.  

Other options available nullValue,dateFormat

Saving modes

Spark DataFrameWriter also has a method mode() to specify SaveMode; the argument to this method either takes below string or a constant from SaveMode class.

overwrite – mode is used to overwrite the existing file, alternatively, you can use SaveMode.Overwrite.

append – To add the data to the existing file, alternatively, you can use SaveMode.Append.

ignore – Ignores write operation when the file already exists, alternatively you can use SaveMode.Ignore.

errorifexists or error – This is a default option when the file already exists, it returns an error, alternatively, you can use SaveMode.ErrorIfExists.


 df2.write.mode(SaveMode.Overwrite).json("/tmp/spark_output/zipcodes.json")

Source code for reference


package com.sparkbyexamples.spark.dataframe
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
object FromJsonFile {
  def main(args:Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExample")
      .getOrCreate()
    val sc = spark.sparkContext
    //read json file into dataframe
    val df = spark.read.json("src/main/resources/zipcodes.json")
    df.printSchema()
    df.show(false)
    //read multiline json file
    val multiline_df = spark.read.option("multiline", "true")
      .json("src/main/resources/multiline-zipcode.json")
    multiline_df.printSchema()
    multiline_df.show(false)
    //read multiple files
    val df2 = spark.read.json(
      "src/main/resources/zipcodes_streaming/zipcode1.json",
      "src/main/resources/zipcodes_streaming/zipcode2.json")
    df2.show(false)
    //read all files from a folder
    val df3 = spark.read.json("src/main/resources/zipcodes_streaming/*")
    df3.show(false)
    //Define custom schema
    val schema = new StructType()
      .add("City", StringType, true)
      .add("Country", StringType, true)
      .add("Decommisioned", BooleanType, true)
      .add("EstimatedPopulation", LongType, true)
      .add("Lat", DoubleType, true)
      .add("Location", StringType, true)
      .add("LocationText", StringType, true)
      .add("LocationType", StringType, true)
      .add("Long", DoubleType, true)
      .add("Notes", StringType, true)
      .add("RecordNumber", LongType, true)
      .add("State", StringType, true)
      .add("TaxReturnsFiled", LongType, true)
      .add("TotalWages", LongType, true)
      .add("WorldRegion", StringType, true)
      .add("Xaxis", DoubleType, true)
      .add("Yaxis", DoubleType, true)
      .add("Zaxis", DoubleType, true)
      .add("Zipcode", StringType, true)
      .add("ZipCodeType", StringType, true)
    val df_with_schema = spark.read.schema(schema)
        .json("src/main/resources/zipcodes.json")
    df_with_schema.printSchema()
    df_with_schema.show(false)
    spark.sqlContext.sql("CREATE TEMPORARY VIEW zipcode USING json OPTIONS" +
      " (path 'src/main/resources/zipcodes.json')")
    spark.sqlContext.sql("SELECT *FROM zipcode").show()
    //Write json file
    df2.write
      .json("/tmp/spark_output/zipcodes.json")
  }
}

Conclusion:

In this tutorial, you have learned how to read a JSON file with single line record and multiline record into Spark DataFrame, and also learned reading single and multiple files at a time and writing JSON file back to DataFrame using different save options.

References:

Happy Learning !!

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

Leave a Reply

Close Menu