• Post author:
  • Post category:PySpark
  • Post last modified:March 27, 2024
  • Reading time:10 mins read
You are currently viewing PySpark Parse JSON from String Column | TEXT File

In this PySpark article I will explain how to parse or read a JSON string from a TEXT/CSV file and convert it into DataFrame columns using Python examples, In order to do this, I will be using the PySpark SQL function from_json().

Advertisements

1. Read JSON String from a TEXT file

In this section, we will see how to parse a JSON string from a text file and convert it to PySpark DataFrame columns using from_json() SQL built-in function.

Below is a JSON data present in a text file,


{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}
{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PASEO COSTA DEL SUR","State":"PR"}
{"Zipcode":709,"ZipCodeType":"STANDARD","City":"BDA SAN LUIS","State":"PR"}
{"Zipcode":76166,"ZipCodeType":"UNIQUE","City":"CINGULAR WIRELESS","State":"TX"}
{"Zipcode":76177,"ZipCodeType":"STANDARD","City":"FORT WORTH","State":"TX"}
{"Zipcode":76177,"ZipCodeType":"STANDARD","City":"FT WORTH","State":"TX"}
{"Zipcode":704,"ZipCodeType":"STANDARD","City":"URB EUGENE RICE","State":"PR"}
{"Zipcode":85209,"ZipCodeType":"STANDARD","City":"MESA","State":"AZ"}
{"Zipcode":85210,"ZipCodeType":"STANDARD","City":"MESA","State":"AZ"}
{"Zipcode":32046,"ZipCodeType":"STANDARD","City":"HILLIARD","State":"FL"}

We can easily read this file with a read.json() method, however, we ignore this and read it as a text file in order to explain from_json() function usage.


from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

#read json from text file
dfFromTxt=spark.read.text("resources/simple_zipcodes_json.txt")
dfFromTxt.printSchema()

This read the JSON string from a text file into a DataFrame value column. Below is the schema of DataFrame.


root
 |-- value: string (nullable = true)

2. Parse JSON String Column & Convert it to Multiple Columns

Now, let’s parse the JSON string from the DataFrame column value and convert it into multiple columns using from_json(), This function takes the DataFrame column with JSON string and JSON schema as arguments. so, let’s create a schema for the JSON string.


# Create Schema of the JSON column
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([ 
    StructField("Zipcode",StringType(),True), 
    StructField("ZipCodeType",StringType(),True), 
    StructField("City",StringType(),True), 
    StructField("State", StringType(), True)
  ])

Finally, use from_json() function which returns the Column struct with all JSON columns and explode the struct to flatten it to multiple columns.


#Convert json column to multiple columns
from pyspark.sql.functions import col,from_json
dfJSON = dfFromTxt.withColumn("jsonData",from_json(col("value"),schema)) \
                   .select("jsonData.*")
dfJSON.printSchema()
dfJSON.show(truncate=False)

Yields below output


root
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)

+-------+-----------+-------------------+-----+
|Zipcode|ZipCodeType|City               |State|
+-------+-----------+-------------------+-----+
|704    |STANDARD   |PARC PARQUE        |PR   |
|704    |STANDARD   |PASEO COSTA DEL SUR|PR   |
|709    |STANDARD   |BDA SAN LUIS       |PR   |
|76166  |UNIQUE     |CINGULAR WIRELESS  |TX   |
|76177  |STANDARD   |FORT WORTH         |TX   |
|76177  |STANDARD   |FT WORTH           |TX   |
|704    |STANDARD   |URB EUGENE RICE    |PR   |
|85209  |STANDARD   |MESA               |AZ   |
|85210  |STANDARD   |MESA               |AZ   |
|32046  |STANDARD   |HILLIARD           |FL   |
+-------+-----------+-------------------+-----+

Alternatively, you can also write the above statement using select.


# Alternatively using select
dfFromTxt.select(from_json(col("value"), schema).alias("data")) \
         .select("data.*") \
         .show()

3. Parse JSON String from CSV

Similarly, we can also parse JSON from a CSV file and create a DataFrame with multiple columns.


#read json from csv file
dfFromCSV=spark.read.option("header",True) \
               .csv("resources/simple_zipcodes_json.csv")
dfFromCSV.printSchema()

This loads the entire JSON string into column JsonValue and yields below schema.


root
 |-- Id: string (nullable = true)
 |-- JsonValue: string (nullable = true)

Now, Let’s parse column JsonValue and convert it to multiple columns using from_json() function.


dfFromCSVJSON =  dfFromCSV.select(col("Id"), \
    from_json(col("JsonValue"),schema).alias("jsonData")) \
      .select("Id","jsonData.*")
dfFromCSVJSON.printSchema()
dfFromCSVJSON.show(truncate=False)

Yields below output.


root
 |-- Id: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)

+---+-------+-----------+-------------------+-----+
|Id |Zipcode|ZipCodeType|City               |State|
+---+-------+-----------+-------------------+-----+
|1  |704    |STANDARD   |PARC PARQUE        |PR   |
|2  |704    |STANDARD   |PASEO COSTA DEL SUR|PR   |
|3  |709    |STANDARD   |BDA SAN LUIS       |PR   |
|4  |76166  |UNIQUE     |CINGULAR WIRELESS  |TX   |
|5  |76177  |STANDARD   |FORT WORTH         |TX   |
|6  |76177  |STANDARD   |FT WORTH           |TX   |
|7  |704    |STANDARD   |URB EUGENE RICE    |PR   |
|8  |85209  |STANDARD   |MESA               |AZ   |
|9  |85210  |STANDARD   |MESA               |AZ   |
|10 |32046  |STANDARD   |HILLIARD           |FL   |
+---+-------+-----------+-------------------+-----+

4. PySpark Convert RDD[String] to JSON

spark.read.json() has a deprecated function to convert RDD[String] which contains a JSON string to PySpark DataFrame.


#Read json from string
data= [(""" {"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"} """)]
rdd = spark.sparkContext.parallelize(data)
df2 = spark.read.json(rdd)
df2.show()

This yields the below output.


+-----------+-----+-----------+-------+
|City       |State|ZipCodeType|Zipcode|
+-----------+-----+-----------+-------+
|PARC PARQUE|PR   |STANDARD   |704    |
+-----------+-----+-----------+-------+

5. Complete example of Parsing JSON from String into DataFrame


from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

#read json from text file
dfFromTxt=spark.read.text("resources/simple_zipcodes_json.txt")
dfFromTxt.show(truncate=False)

from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([ 
    StructField("Zipcode",StringType(),True), 
    StructField("ZipCodeType",StringType(),True), 
    StructField("City",StringType(),True), 
    StructField("State", StringType(), True)
  ])

from pyspark.sql.functions import col,from_json
dfJSON = dfFromTxt.withColumn("jsonData",from_json(col("value"),schema)) \
                   .select("jsonData.*")
dfJSON.printSchema()
dfJSON.show(truncate=False)

dfFromTxt.select(from_json(col("value"), schema).alias("data")) \
         .select("data.*") \
         .show()
         
#read json from text file
dfFromCSV=spark.read.option("header",True) \
               .csv("resources/simple_zipcodes_json.csv")
dfFromCSV.printSchema()

dfFromCSVJSON =  dfFromCSV.select(col("Id"), \
    from_json(col("JsonValue"),schema).alias("jsonData")) \
      .select("Id","jsonData.*")
dfFromCSVJSON.printSchema()
dfFromCSVJSON.show(truncate=False)

# Read json from string

data= [(""" {"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"} """)]
rdd = spark.sparkContext.parallelize(data)
df2 = spark.read.json(rdd)
df2.show()

This example is also available at GitHub project for reference.

Conclusion

In this PySpark article, you have learned how to read a JSON string from TEXT and CSV files and also learned how to parse a JSON string from a DataFrame column and convert it into multiple columns using Python examples.

  • PySpark JSON Functions with Examples
  • PySpark printSchema() to String or JSON
  • PySpark Read JSON file into DataFrame
  • PySpark Read Multiple Lines (multiline) JSON File
  • PySpark createOrReplaceTempView() Explained
  • PySpark startsWith() and endsWith() Functions
  • PySpark printSchema() Example

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium