In this PySpark article I will explain how to parse or read a JSON string from a TEXT/CSV file and convert it into DataFrame columns using Python examples, In order to do this, I will be using the PySpark SQL function from_json().
1. Read JSON String from a TEXT file
In this section, we will see how to parse a JSON string from a text file and convert it to PySpark DataFrame columns using from_json()
SQL built-in function.
Below is a JSON data present in a text file,
{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}
{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PASEO COSTA DEL SUR","State":"PR"}
{"Zipcode":709,"ZipCodeType":"STANDARD","City":"BDA SAN LUIS","State":"PR"}
{"Zipcode":76166,"ZipCodeType":"UNIQUE","City":"CINGULAR WIRELESS","State":"TX"}
{"Zipcode":76177,"ZipCodeType":"STANDARD","City":"FORT WORTH","State":"TX"}
{"Zipcode":76177,"ZipCodeType":"STANDARD","City":"FT WORTH","State":"TX"}
{"Zipcode":704,"ZipCodeType":"STANDARD","City":"URB EUGENE RICE","State":"PR"}
{"Zipcode":85209,"ZipCodeType":"STANDARD","City":"MESA","State":"AZ"}
{"Zipcode":85210,"ZipCodeType":"STANDARD","City":"MESA","State":"AZ"}
{"Zipcode":32046,"ZipCodeType":"STANDARD","City":"HILLIARD","State":"FL"}
We can easily read this file with a read.json() method, however, we ignore this and read it as a text file in order to explain from_json()
function usage.
from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
#read json from text file
dfFromTxt=spark.read.text("resources/simple_zipcodes_json.txt")
dfFromTxt.printSchema()
This read the JSON string from a text file into a DataFrame value
column. Below is the schema of DataFrame.
root
|-- value: string (nullable = true)
2. Parse JSON String Column & Convert it to Multiple Columns
Now, let’s parse the JSON string from the DataFrame column value
and convert it into multiple columns using from_json()
, This function takes the DataFrame column with JSON string and JSON schema as arguments. so, let’s create a schema for the JSON string.
# Create Schema of the JSON column
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
StructField("Zipcode",StringType(),True),
StructField("ZipCodeType",StringType(),True),
StructField("City",StringType(),True),
StructField("State", StringType(), True)
])
Finally, use from_json()
function which returns the Column struct with all JSON columns and explode the struct to flatten it to multiple columns.
#Convert json column to multiple columns
from pyspark.sql.functions import col,from_json
dfJSON = dfFromTxt.withColumn("jsonData",from_json(col("value"),schema)) \
.select("jsonData.*")
dfJSON.printSchema()
dfJSON.show(truncate=False)
Yields below output
root
|-- Zipcode: string (nullable = true)
|-- ZipCodeType: string (nullable = true)
|-- City: string (nullable = true)
|-- State: string (nullable = true)
+-------+-----------+-------------------+-----+
|Zipcode|ZipCodeType|City |State|
+-------+-----------+-------------------+-----+
|704 |STANDARD |PARC PARQUE |PR |
|704 |STANDARD |PASEO COSTA DEL SUR|PR |
|709 |STANDARD |BDA SAN LUIS |PR |
|76166 |UNIQUE |CINGULAR WIRELESS |TX |
|76177 |STANDARD |FORT WORTH |TX |
|76177 |STANDARD |FT WORTH |TX |
|704 |STANDARD |URB EUGENE RICE |PR |
|85209 |STANDARD |MESA |AZ |
|85210 |STANDARD |MESA |AZ |
|32046 |STANDARD |HILLIARD |FL |
+-------+-----------+-------------------+-----+
Alternatively, you can also write the above statement using select.
# Alternatively using select
dfFromTxt.select(from_json(col("value"), schema).alias("data")) \
.select("data.*") \
.show()
3. Parse JSON String from CSV
Similarly, we can also parse JSON from a CSV file and create a DataFrame with multiple columns.
#read json from csv file
dfFromCSV=spark.read.option("header",True) \
.csv("resources/simple_zipcodes_json.csv")
dfFromCSV.printSchema()
This loads the entire JSON string into column JsonValue
and yields below schema.
root
|-- Id: string (nullable = true)
|-- JsonValue: string (nullable = true)
Now, Let’s parse column JsonValue
and convert it to multiple columns using from_json()
function.
dfFromCSVJSON = dfFromCSV.select(col("Id"), \
from_json(col("JsonValue"),schema).alias("jsonData")) \
.select("Id","jsonData.*")
dfFromCSVJSON.printSchema()
dfFromCSVJSON.show(truncate=False)
Yields below output.
root
|-- Id: string (nullable = true)
|-- Zipcode: string (nullable = true)
|-- ZipCodeType: string (nullable = true)
|-- City: string (nullable = true)
|-- State: string (nullable = true)
+---+-------+-----------+-------------------+-----+
|Id |Zipcode|ZipCodeType|City |State|
+---+-------+-----------+-------------------+-----+
|1 |704 |STANDARD |PARC PARQUE |PR |
|2 |704 |STANDARD |PASEO COSTA DEL SUR|PR |
|3 |709 |STANDARD |BDA SAN LUIS |PR |
|4 |76166 |UNIQUE |CINGULAR WIRELESS |TX |
|5 |76177 |STANDARD |FORT WORTH |TX |
|6 |76177 |STANDARD |FT WORTH |TX |
|7 |704 |STANDARD |URB EUGENE RICE |PR |
|8 |85209 |STANDARD |MESA |AZ |
|9 |85210 |STANDARD |MESA |AZ |
|10 |32046 |STANDARD |HILLIARD |FL |
+---+-------+-----------+-------------------+-----+
4. PySpark Convert RDD[String] to JSON
spark.read.json()
has a deprecated function to convert RDD[String]
which contains a JSON string to PySpark DataFrame.
#Read json from string
data= [(""" {"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"} """)]
rdd = spark.sparkContext.parallelize(data)
df2 = spark.read.json(rdd)
df2.show()
This yields the below output.
+-----------+-----+-----------+-------+
|City |State|ZipCodeType|Zipcode|
+-----------+-----+-----------+-------+
|PARC PARQUE|PR |STANDARD |704 |
+-----------+-----+-----------+-------+
5. Complete example of Parsing JSON from String into DataFrame
from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
#read json from text file
dfFromTxt=spark.read.text("resources/simple_zipcodes_json.txt")
dfFromTxt.show(truncate=False)
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
StructField("Zipcode",StringType(),True),
StructField("ZipCodeType",StringType(),True),
StructField("City",StringType(),True),
StructField("State", StringType(), True)
])
from pyspark.sql.functions import col,from_json
dfJSON = dfFromTxt.withColumn("jsonData",from_json(col("value"),schema)) \
.select("jsonData.*")
dfJSON.printSchema()
dfJSON.show(truncate=False)
dfFromTxt.select(from_json(col("value"), schema).alias("data")) \
.select("data.*") \
.show()
#read json from text file
dfFromCSV=spark.read.option("header",True) \
.csv("resources/simple_zipcodes_json.csv")
dfFromCSV.printSchema()
dfFromCSVJSON = dfFromCSV.select(col("Id"), \
from_json(col("JsonValue"),schema).alias("jsonData")) \
.select("Id","jsonData.*")
dfFromCSVJSON.printSchema()
dfFromCSVJSON.show(truncate=False)
# Read json from string
data= [(""" {"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"} """)]
rdd = spark.sparkContext.parallelize(data)
df2 = spark.read.json(rdd)
df2.show()
This example is also available at GitHub project for reference.
Conclusion
In this PySpark article, you have learned how to read a JSON string from TEXT and CSV files and also learned how to parse a JSON string from a DataFrame column and convert it into multiple columns using Python examples.
Related Articles
- PySpark JSON Functions with Examples
- PySpark printSchema() to String or JSON
- PySpark Read JSON file into DataFrame
- PySpark Read Multiple Lines (multiline) JSON File
- PySpark createOrReplaceTempView() Explained
- PySpark startsWith() and endsWith() Functions
- PySpark printSchema() Example