You are currently viewing Spark Parse JSON from String Column | Text File

In this Spark article, you will learn how to parse or read a JSON string from a TEXT/CSV file and convert it into multiple DataFrame columns using Scala examples.

Assume you have a text file with a JSON data or a CSV file with a JSON string in a column, In order to read these files and parse JSON and convert to DataFrame, we use from_json() function provided in Spark SQL.

1. Read and Parse a JSON from a TEXT file

In this section, we will see parsing a JSON string from a text file and convert it to Spark DataFrame columns using from_json() Spark SQL built-in function.

Below is a JSON data present in a text file,


{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}
{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PASEO COSTA DEL SUR","State":"PR"}
{"Zipcode":709,"ZipCodeType":"STANDARD","City":"BDA SAN LUIS","State":"PR"}
{"Zipcode":76166,"ZipCodeType":"UNIQUE","City":"CINGULAR WIRELESS","State":"TX"}
{"Zipcode":76177,"ZipCodeType":"STANDARD","City":"FORT WORTH","State":"TX"}
{"Zipcode":76177,"ZipCodeType":"STANDARD","City":"FT WORTH","State":"TX"}
{"Zipcode":704,"ZipCodeType":"STANDARD","City":"URB EUGENE RICE","State":"PR"}
{"Zipcode":85209,"ZipCodeType":"STANDARD","City":"MESA","State":"AZ"}
{"Zipcode":85210,"ZipCodeType":"STANDARD","City":"MESA","State":"AZ"}
{"Zipcode":32046,"ZipCodeType":"STANDARD","City":"HILLIARD","State":"FL"}

we can easily read this file with a read.json() method, however, we ignore this and read it as a text file in order to explain from_json() function usage.


//Read JSON string from text file
val dfFromText:DataFrame = spark.read.text("src/main/resources/simple_zipcodes.txt")
dfFromText.printSchema()

This read the JSON string from a text file into a DataFrame value column as shown in below schema.


root
 |-- value: string (nullable = true)

2. Convert JSON column to Multiple Columns

Now, let’s convert the value column into multiple columns using from_json(), This function takes the DataFrame column with JSON string and JSON schema as arguments. so, first, let’s create a schema that represents our data.


//Define schema of JSON structure
import org.apache.spark.sql.types.{StringType, StructType}
val schema = new StructType()
    .add("Zipcode", StringType, true)
    .add("ZipCodeType", StringType, true)
    .add("City", StringType, true)
    .add("State", StringType, true)

Finally, use from_json() function which returns the Column Struct with all JSON columns and we explode the struct to flatten it.


// convert json column to multiple columns
import org.apache.spark.sql.functions.{col,from_json}
val dfJSON = dfFromText.withColumn("jsonData",from_json(col("value"),schema))
    .select("jsonData.*")
dfJSON.printSchema()
dfJSON.show(false)

Yields below output


root
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)

+-------+-----------+-------------------+-----+
|Zipcode|ZipCodeType|City               |State|
+-------+-----------+-------------------+-----+
|704    |STANDARD   |PARC PARQUE        |PR   |
|704    |STANDARD   |PASEO COSTA DEL SUR|PR   |
|709    |STANDARD   |BDA SAN LUIS       |PR   |
|76166  |UNIQUE     |CINGULAR WIRELESS  |TX   |
|76177  |STANDARD   |FORT WORTH         |TX   |
|76177  |STANDARD   |FT WORTH           |TX   |
|704    |STANDARD   |URB EUGENE RICE    |PR   |
|85209  |STANDARD   |MESA               |AZ   |
|85210  |STANDARD   |MESA               |AZ   |
|32046  |STANDARD   |HILLIARD           |FL   |
+-------+-----------+-------------------+-----+

Alternatively, you can also write the above statement using select.


//alternatively using select
val df5 = dfFromText.select(from_json(col("value"), schema).as("data"))
    .select("data.*").show(false)

3. Read and Parse a JSON from CSV column string

Similarly, we can also parse JSON from a CSV file and create a DataFrame.


//Read from CSV file
val dfFromCSV:DataFrame = spark.read.option("header",true)
       .csv("src/main/resources/simple_zipcodes.csv")
dfFromCSV.printSchema()
dfFromCSV.show(false)

This loads the entire JSON string into column JsonValue and yields below schema.


root
 |-- Id: string (nullable = true)
 |-- JsonValue: string (nullable = true)

Now, Let’s parse column JsonValue and convert it to multiple columns by using from_json() function.


//Conver JsonValue to multiple columns
val dfFromCSVJSON =  dfFromCSV.select(col("Id"),
    from_json(col("JsonValue"),schema).as("jsonData"))
      .select("Id","jsonData.*")
dfFromCSVJSON.printSchema()
dfFromCSVJSON.show(false)

Yields below output


root
 |-- Id: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)

+---+-------+-----------+-------------------+-----+
|Id |Zipcode|ZipCodeType|City               |State|
+---+-------+-----------+-------------------+-----+
|1  |704    |STANDARD   |PARC PARQUE        |PR   |
|2  |704    |STANDARD   |PASEO COSTA DEL SUR|PR   |
|3  |709    |STANDARD   |BDA SAN LUIS       |PR   |
|4  |76166  |UNIQUE     |CINGULAR WIRELESS  |TX   |
|5  |76177  |STANDARD   |FORT WORTH         |TX   |
|6  |76177  |STANDARD   |FT WORTH           |TX   |
|7  |704    |STANDARD   |URB EUGENE RICE    |PR   |
|8  |85209  |STANDARD   |MESA               |AZ   |
|9  |85210  |STANDARD   |MESA               |AZ   |
|10 |32046  |STANDARD   |HILLIARD           |FL   |
+---+-------+-----------+-------------------+-----+

4. Convert JSON String to DataFrame Columns

When you have a JSON in a string and wanted to convert or load to Spark DataFrame, use spark.read.json() , this function takes Dataset[String] as an argument.


//Read json from string
import spark.implicits._
val jsonStr = """{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
val df = spark.read.json(Seq(jsonStr).toDS())
df.show(false)

Yields below output


+-----------+-----+-----------+-------+
|City       |State|ZipCodeType|Zipcode|
+-----------+-----+-----------+-------+
|PARC PARQUE|PR   |STANDARD   |704    |
+-----------+-----+-----------+-------+

5. Converting RDD[String] to JSON

spark.read.json() also has another deprecated function to convert RDD[String] which contains a JSON string to Spark DataFrame


// from RDD[String]
// deprecated
val rdd = spark.sparkContext.parallelize(
    """ {"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"} """ :: Nil)
val df2 = spark.read.json(rdd)
df2.show()

This yields the same output as above.

6. Complete example of Parsing JSON from String into DataFrame


package com.sparkbyexamples.spark.dataframe

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object ReadJsonFromString extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  //Read JSON string from text file
  val dfFromText:DataFrame = spark.read.text("src/main/resources/simple_zipcodes.txt")
  dfFromText.printSchema()
  
  //Define schema of JSON structure
  val schema = new StructType()
    .add("Zipcode", StringType, true)
    .add("ZipCodeType", StringType, true)
    .add("City", StringType, true)
    .add("State", StringType, true)

  //Convert JSON column to multiple columns
  val dfJSON = dfFromText.withColumn("jsonData",from_json(col("value"),schema))
    .select("jsonData.*")
  dfJSON.printSchema()
  dfJSON.show(false)

  //Alternatively using select
  val dfJSON2 = dfFromText.select(from_json(col("value"), schema).as("jsonData"))
    .select("jsonData.*")

  //Read JSON string from CSV file
  val dfFromCSV:DataFrame = spark.read.option("header",true)
     .csv("src/main/resources/simple_zipcodes.csv")
  dfFromCSV.printSchema()
  dfFromCSV.show(false)

  val dfFromCSVJSON =  dfFromCSV.select(col("Id"),
    from_json(col("JsonValue"),schema).as("jsonData"))
      .select("Id","jsonData.*")
  dfFromCSVJSON.printSchema()
  dfFromCSVJSON.show(false)

  //Read json from string
  import spark.implicits._
  val jsonStr = """{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
  val df = spark.read.json(Seq(jsonStr).toDS())
  df.show(false)

  // from RDD[String]
  // deprecated
  val rdd = spark.sparkContext.parallelize(
    """ {"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"} """ :: Nil)
  val df2 = spark.read.json(rdd)
  df2.show()

}

This example is also available at GitHub project for reference.

7. Conclusion

In this Spark article, you have learned how to read and parse a JSON string from a text and CSV files and also learned how to convert JSON string columns into multiple columns on DataFrame using Scala examples.

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

This Post Has 3 Comments

  1. arun

    This is wonderful.. i have a scenario where in i have 2 set of information embedded as a single row which i wanted to split and read it as 2 row instead of 1. can you please help me with this

    Example
    val jsonStr = “””[{“Zipcode”:704,”ZipCodeType”:”STANDARD”,”City”:”PARC PARQUE”,”State”:”PR”},{Zipcode”:704,”ZipCodeType”:”STANDARD”,”City”:”PARC PARQUE”,”State”:”PR”}]”””

  2. NNK

    Hi Saurav, Thanks for reading and happy it helped you. For nulls, wondering if your file has an empty line or the data in the file might not in right format.

  3. Saurav Rout

    This is awesome post, really helped me fixing my problem. However in my case it makes the first line as NULL, where as I dont see such thing in my text file. DO you think I am missing something?

    +————-+——————–+——————–+——————–+———–+
    |applicationId| transactionId| correlationId| message|messageType|
    +————-+——————–+——————–+——————–+———–+
    | null| null| null| null| null|
    | RXLINK|RXLINK-1589386038…| BenefitInquiry-aarp|Member found, tar…| 200|

Comments are closed.