PySpark SQL provides read.json("path")
to read a single line or multiline (multiple lines) JSON file into PySpark DataFrame and write.json("path")
to save or write to JSON file, In this tutorial, you will learn how to read a single file, multiple files, all files from a directory into DataFrame and writing DataFrame back to JSON file using Python example.
Related:
- PySpark Parse JSON from String Column | TEXT File
- Convert JSON Column to Struct, Map or Multiple Columns in PySpark
- Most used PySpark JSON Functions with Examples
Note: PySpark API out of the box supports to read JSON files and many more file formats into PySpark DataFrame.
Table of contents:
- PySpark Read JSON file into DataFrame
- Read JSON file from multiline
- Read multiple files at a time
- Read all files in a directory
- Read file with a user-specified schema
- Read file using PySpark SQL
- Options while reading JSON file
- PySpark Write DataFrame to JSON file
PySpark Read JSON file into DataFrame
Using read.json("path")
or read.format("json").load("path")
you can read a JSON file into a PySpark DataFrame, these methods take a file path as an argument.
Unlike reading a CSV, By default JSON data source inferschema from an input file.
zipcodes.json file used here can be downloaded from GitHub project.
# Read JSON file into dataframe
df = spark.read.json("resources/zipcodes.json")
df.printSchema()
df.show()
When you use format("json")
method, you can also specify the Data sources by their fully qualified name as below.
# Read JSON file into dataframe
df = spark.read.format('org.apache.spark.sql.json') \
.load("resources/zipcodes.json")
Read JSON file from multiline
PySpark JSON data source provides multiple options to read files in different options, use multiline
option to read JSON files scattered across multiple lines. By default multiline option, is set to false.
Below is the input file we going to read, this same file is also available at Github.
[{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
},
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}]
Using read.option("multiline","true")
# Read multiline json file
multiline_df = spark.read.option("multiline","true") \
.json("resources/multiline-zipcode.json")
multiline_df.show()
Reading multiple files at a time
Using the read.json()
method you can also read multiple JSON files from different paths, just pass all file names with fully qualified paths by separating comma, for example
# Read multiple files
df2 = spark.read.json(
['resources/zipcode1.json','resources/zipcode2.json'])
df2.show()
Reading all files in a directory
We can read all JSON files from a directory into DataFrame just by passing directory as a path to the json()
method.
# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")
df3.show()
Reading files with a user-specified custom schema
PySpark Schema defines the structure of the data, in other words, it is the structure of the DataFrame. PySpark SQL provides StructType & StructField classes to programmatically specify the structure to the DataFrame.
If you know the schema of the file ahead and do not want to use the default inferSchema
option, use schema option to specify user-defined custom column names and data types.
Use the PySpark StructType class to create a custom schema, below we initiate this class and use add a method to add columns to it by providing the column name, data type and nullable option.
# Define custom schema
schema = StructType([
StructField("RecordNumber",IntegerType(),True),
StructField("Zipcode",IntegerType(),True),
StructField("ZipCodeType",StringType(),True),
StructField("City",StringType(),True),
StructField("State",StringType(),True),
StructField("LocationType",StringType(),True),
StructField("Lat",DoubleType(),True),
StructField("Long",DoubleType(),True),
StructField("Xaxis",IntegerType(),True),
StructField("Yaxis",DoubleType(),True),
StructField("Zaxis",DoubleType(),True),
StructField("WorldRegion",StringType(),True),
StructField("Country",StringType(),True),
StructField("LocationText",StringType(),True),
StructField("Location",StringType(),True),
StructField("Decommisioned",BooleanType(),True),
StructField("TaxReturnsFiled",StringType(),True),
StructField("EstimatedPopulation",IntegerType(),True),
StructField("TotalWages",IntegerType(),True),
StructField("Notes",StringType(),True)
])
df_with_schema = spark.read.schema(schema) \
.json("resources/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()
Read JSON file using PySpark SQL
PySpark SQL also provides a way to read a JSON file by creating a temporary view directly from the reading file using spark.sqlContext.sql(“load JSON to temporary view”)
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" +
" (path 'resources/zipcodes.json')")
spark.sql("select * from zipcode").show()
Options while reading JSON file
nullValues
Using nullValues
option you can specify the string in a JSON to consider as null. For example, if you want to consider a date column with a value “1900-01-01” set null on DataFrame.
dateFormat
dateFormat
option to used to set the format of the input DateType and TimestampType columns. Supports all java.text.SimpleDateFormat formats.
Note: Besides the above options, PySpark JSON dataset also supports many other options.
Applying DataFrame transformations
Once you have create PySpark DataFrame from the JSON file, you can apply all transformation and actions DataFrame support. Please refer to the link for more details.
Write PySpark DataFrame to JSON file
Use the PySpark DataFrameWriter object “write” method on DataFrame to write a JSON file.
df2.write.json("/tmp/spark_output/zipcodes.json")
PySpark Options while writing JSON files
While writing a JSON file you can use several options.
Other options available nullValue
,dateFormat
PySpark Saving modes
PySpark DataFrameWriter also has a method mode() to specify SaveMode; the argument to this method either takes overwrite
, append
, ignore
, errorifexists
.
overwrite – mode is used to overwrite the existing file
append – To add the data to the existing file
ignore – Ignores write operation when the file already exists
errorifexists or error – This is a default option when the file already exists, it returns an error
df2.write.mode('Overwrite').json("/tmp/spark_output/zipcodes.json")
Source code for reference
This example is also available at GitHub PySpark Example Project for reference.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType
spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkByExamples.com") \
.getOrCreate()
# Read JSON file into dataframe
df = spark.read.json("resources/zipcodes.json")
df.printSchema()
df.show()
# Read multiline json file
multiline_df = spark.read.option("multiline","true") \
.json("resources/multiline-zipcode.json")
multiline_df.show()
#Read multiple files
df2 = spark.read.json(
['resources/zipcode2.json','resources/zipcode1.json'])
df2.show()
#Read All JSON files from a directory
df3 = spark.read.json("resources/*.json")
df3.show()
# Define custom schema
schema = StructType([
StructField("RecordNumber",IntegerType(),True),
StructField("Zipcode",IntegerType(),True),
StructField("ZipCodeType",StringType(),True),
StructField("City",StringType(),True),
StructField("State",StringType(),True),
StructField("LocationType",StringType(),True),
StructField("Lat",DoubleType(),True),
StructField("Long",DoubleType(),True),
StructField("Xaxis",IntegerType(),True),
StructField("Yaxis",DoubleType(),True),
StructField("Zaxis",DoubleType(),True),
StructField("WorldRegion",StringType(),True),
StructField("Country",StringType(),True),
StructField("LocationText",StringType(),True),
StructField("Location",StringType(),True),
StructField("Decommisioned",BooleanType(),True),
StructField("TaxReturnsFiled",StringType(),True),
StructField("EstimatedPopulation",IntegerType(),True),
StructField("TotalWages",IntegerType(),True),
StructField("Notes",StringType(),True)
])
df_with_schema = spark.read.schema(schema) \
.json("resources/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()
# Create a table from Parquet File
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode3 USING json OPTIONS" +
" (path 'resources/zipcodes.json')")
spark.sql("select * from zipcode3").show()
# PySpark write Parquet File
df2.write.mode('Overwrite').json("/tmp/spark_output/zipcodes.json")
Conclusion:
In this tutorial, you have learned how to read a JSON file with single line record and multiline record into PySpark DataFrame, and also learned reading single and multiple files at a time and writing JSON file back to DataFrame using different save options.
Related Articles
- PySpark Read and Write Parquet File
- PySpark Read CSV file into DataFrame
- PySpark Read Multiple Lines (multiline) JSON File
- Dynamic way of doing ETL through Pyspark
- PySpark cache() Explained.
- PySpark Write to CSV File
- PySpark repartition() – Explained with Examples
- PySpark Create RDD with Examples
- PySpark SparkContext Explained
References:
Happy Learning !!