PySpark Parse JSON from String Column | TEXT File

In this PySpark article I will explain how to parse or read a JSON string from a TEXT/CSV file and convert it into DataFrame columns using Python examples, In order to do this, I will be using the PySpark SQL function from_json().

1. Read JSON String from a TEXT file

In this section, we will see how to parse a JSON string from a text file and convert it to PySpark DataFrame columns using from_json() SQL built-in function.

Below is a JSON data present in a text file,


{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}
{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PASEO COSTA DEL SUR","State":"PR"}
{"Zipcode":709,"ZipCodeType":"STANDARD","City":"BDA SAN LUIS","State":"PR"}
{"Zipcode":76166,"ZipCodeType":"UNIQUE","City":"CINGULAR WIRELESS","State":"TX"}
{"Zipcode":76177,"ZipCodeType":"STANDARD","City":"FORT WORTH","State":"TX"}
{"Zipcode":76177,"ZipCodeType":"STANDARD","City":"FT WORTH","State":"TX"}
{"Zipcode":704,"ZipCodeType":"STANDARD","City":"URB EUGENE RICE","State":"PR"}
{"Zipcode":85209,"ZipCodeType":"STANDARD","City":"MESA","State":"AZ"}
{"Zipcode":85210,"ZipCodeType":"STANDARD","City":"MESA","State":"AZ"}
{"Zipcode":32046,"ZipCodeType":"STANDARD","City":"HILLIARD","State":"FL"}

We can easily read this file with a read.json() method, however, we ignore this and read it as a text file in order to explain from_json() function usage.


from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

#read json from text file
dfFromTxt=spark.read.text("resources/simple_zipcodes_json.txt")
dfFromTxt.printSchema()

This read the JSON string from a text file into a DataFrame value column. Below is the schema of DataFrame.


root
 |-- value: string (nullable = true)

2. Parse JSON String Column & Convert it to Multiple Columns

Now, let’s parse the JSON string from the DataFrame column value and convert it into multiple columns using from_json(), This function takes the DataFrame column with JSON string and JSON schema as arguments. so, let’s create a schema for the JSON string.


# Create Schema of the JSON column
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([ 
    StructField("Zipcode",StringType(),True), 
    StructField("ZipCodeType",StringType(),True), 
    StructField("City",StringType(),True), 
    StructField("State", StringType(), True)
  ])

Finally, use from_json() function which returns the Column struct with all JSON columns and explode the struct to flatten it to multiple columns.


#Convert json column to multiple columns
from pyspark.sql.functions import col,from_json
dfJSON = dfFromTxt.withColumn("jsonData",from_json(col("value"),schema)) \
                   .select("jsonData.*")
dfJSON.printSchema()
dfJSON.show(truncate=False)

Yields below output


root
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)

+-------+-----------+-------------------+-----+
|Zipcode|ZipCodeType|City               |State|
+-------+-----------+-------------------+-----+
|704    |STANDARD   |PARC PARQUE        |PR   |
|704    |STANDARD   |PASEO COSTA DEL SUR|PR   |
|709    |STANDARD   |BDA SAN LUIS       |PR   |
|76166  |UNIQUE     |CINGULAR WIRELESS  |TX   |
|76177  |STANDARD   |FORT WORTH         |TX   |
|76177  |STANDARD   |FT WORTH           |TX   |
|704    |STANDARD   |URB EUGENE RICE    |PR   |
|85209  |STANDARD   |MESA               |AZ   |
|85210  |STANDARD   |MESA               |AZ   |
|32046  |STANDARD   |HILLIARD           |FL   |
+-------+-----------+-------------------+-----+

Alternatively, you can also write the above statement using select.


# Alternatively using select
dfFromTxt.select(from_json(col("value"), schema).alias("data")) \
         .select("data.*") \
         .show()

3. Parse JSON String from CSV

Similarly, we can also parse JSON from a CSV file and create a DataFrame with multiple columns.


#read json from csv file
dfFromCSV=spark.read.option("header",True) \
               .csv("resources/simple_zipcodes_json.csv")
dfFromCSV.printSchema()

This loads the entire JSON string into column JsonValue and yields below schema.


root
 |-- Id: string (nullable = true)
 |-- JsonValue: string (nullable = true)

Now, Let’s parse column JsonValue and convert it to multiple columns using from_json() function.


dfFromCSVJSON =  dfFromCSV.select(col("Id"), \
    from_json(col("JsonValue"),schema).alias("jsonData")) \
      .select("Id","jsonData.*")
dfFromCSVJSON.printSchema()
dfFromCSVJSON.show(truncate=False)

Yields below output.


root
 |-- Id: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)

+---+-------+-----------+-------------------+-----+
|Id |Zipcode|ZipCodeType|City               |State|
+---+-------+-----------+-------------------+-----+
|1  |704    |STANDARD   |PARC PARQUE        |PR   |
|2  |704    |STANDARD   |PASEO COSTA DEL SUR|PR   |
|3  |709    |STANDARD   |BDA SAN LUIS       |PR   |
|4  |76166  |UNIQUE     |CINGULAR WIRELESS  |TX   |
|5  |76177  |STANDARD   |FORT WORTH         |TX   |
|6  |76177  |STANDARD   |FT WORTH           |TX   |
|7  |704    |STANDARD   |URB EUGENE RICE    |PR   |
|8  |85209  |STANDARD   |MESA               |AZ   |
|9  |85210  |STANDARD   |MESA               |AZ   |
|10 |32046  |STANDARD   |HILLIARD           |FL   |
+---+-------+-----------+-------------------+-----+

4. PySpark Convert RDD[String] to JSON

spark.read.json() has a deprecated function to convert RDD[String] which contains a JSON string to PySpark DataFrame.


#Read json from string
data= [(""" {"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"} """)]
rdd = spark.sparkContext.parallelize(data)
df2 = spark.read.json(rdd)
df2.show()

This yields the below output.


+-----------+-----+-----------+-------+
|City       |State|ZipCodeType|Zipcode|
+-----------+-----+-----------+-------+
|PARC PARQUE|PR   |STANDARD   |704    |
+-----------+-----+-----------+-------+

5. Complete example of Parsing JSON from String into DataFrame


from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

#read json from text file
dfFromTxt=spark.read.text("resources/simple_zipcodes_json.txt")
dfFromTxt.show(truncate=False)

from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([ 
    StructField("Zipcode",StringType(),True), 
    StructField("ZipCodeType",StringType(),True), 
    StructField("City",StringType(),True), 
    StructField("State", StringType(), True)
  ])

from pyspark.sql.functions import col,from_json
dfJSON = dfFromTxt.withColumn("jsonData",from_json(col("value"),schema)) \
                   .select("jsonData.*")
dfJSON.printSchema()
dfJSON.show(truncate=False)

dfFromTxt.select(from_json(col("value"), schema).alias("data")) \
         .select("data.*") \
         .show()
         
#read json from text file
dfFromCSV=spark.read.option("header",True) \
               .csv("resources/simple_zipcodes_json.csv")
dfFromCSV.printSchema()

dfFromCSVJSON =  dfFromCSV.select(col("Id"), \
    from_json(col("JsonValue"),schema).alias("jsonData")) \
      .select("Id","jsonData.*")
dfFromCSVJSON.printSchema()
dfFromCSVJSON.show(truncate=False)

# Read json from string

data= [(""" {"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"} """)]
rdd = spark.sparkContext.parallelize(data)
df2 = spark.read.json(rdd)
df2.show()

This example is also available at GitHub project for reference.

Conclusion

In this PySpark article, you have learned how to read a JSON string from TEXT and CSV files and also learned how to parse a JSON string from a DataFrame column and convert it into multiple columns using Python examples.

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply