Spark Read JSON from a CSV file

  • Post author:
  • Post category:Apache Spark

In this Spark article, you will learn how to parse or read a JSON string from a CSV file into DataFrame or from JSON String column using Scala examples.

Assume you have a CSV file with a JSON string in one of the column and you want to parse it and create DataFrame columns, In order to read CSV file and parse JSON and convert to DataFrame, we use from_json() function provided in Spark SQL.

Read and Parse a JSON from CSV file

In order to read a JSON string from a CSV file, first, we need to read a CSV file into Spark Dataframe using spark.read.csv("path") and then parse the JSON string column and convert it to columns using from_json() function. This function takes the first argument as a JSON column name and the second argument as JSON schema.

So, first, let’s create the schema that defines our JSON column. Input CSV file referred here is available at GitHub for reference.


  val dfFromCSV:DataFrame = spark.read.option("header",true)
       .csv("src/main/resources/simple_zipcodes.csv")
  dfFromCSV.printSchema()
  dfFromCSV.show(false)

This reads the entire JSON string into column “JsonValue” and yields below output.


+---+----------------------------------------------------------------------------------+
|Id |JsonValue                                                                         |
+---+----------------------------------------------------------------------------------+
|1  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}        |
|2  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PASEO COSTA DEL SUR","State":"PR"}|
|3  |{"Zipcode":709,"ZipCodeType":"STANDARD","City":"BDA SAN LUIS","State":"PR"}       |
|4  |{"Zipcode":76166,"ZipCodeType":"UNIQUE","City":"CINGULAR WIRELESS","State":"TX"}  |
|5  |{"Zipcode":76177,"ZipCodeType":"STANDARD","City":"FORT WORTH","State":"TX"}       |
|6  |{"Zipcode":76177,"ZipCodeType":"STANDARD","City":"FT WORTH","State":"TX"}         |
|7  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"URB EUGENE RICE","State":"PR"}    |
|8  |{"Zipcode":85209,"ZipCodeType":"STANDARD","City":"MESA","State":"AZ"}             |
|9  |{"Zipcode":85210,"ZipCodeType":"STANDARD","City":"MESA","State":"AZ"}             |
|10 |{"Zipcode":32046,"ZipCodeType":"STANDARD","City":"HILLIARD","State":"FL"}         |
+---+----------------------------------------------------------------------------------+

Now, Let’s parse column “JsonValue” and convert it to multiple columns.


  val dfFromCSVJSON =  dfFromCSV.select(col("Id"),
    from_json(col("JsonValue"),schema).as("jsonData"))
      .select("Id","jsonData.*")
  dfFromCSVJSON.printSchema()
  dfFromCSVJSON.show(false)

Yields below output


root
 |-- Id: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)

+---+-------+-----------+-------------------+-----+
|Id |Zipcode|ZipCodeType|City               |State|
+---+-------+-----------+-------------------+-----+
|1  |704    |STANDARD   |PARC PARQUE        |PR   |
|2  |704    |STANDARD   |PASEO COSTA DEL SUR|PR   |
|3  |709    |STANDARD   |BDA SAN LUIS       |PR   |
|4  |76166  |UNIQUE     |CINGULAR WIRELESS  |TX   |
|5  |76177  |STANDARD   |FORT WORTH         |TX   |
|6  |76177  |STANDARD   |FT WORTH           |TX   |
|7  |704    |STANDARD   |URB EUGENE RICE    |PR   |
|8  |85209  |STANDARD   |MESA               |AZ   |
|9  |85210  |STANDARD   |MESA               |AZ   |
|10 |32046  |STANDARD   |HILLIARD           |FL   |
+---+-------+-----------+-------------------+-----+

Converting String to JSON

When you have a JSON in a string and wanted to convert or load to Spark DataFrame, use spark.read.json() , this function takes Dataset[String] as an argument.


  //Read json from string
  import spark.implicits._
  val jsonStr = """{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
  val df = spark.read.json(Seq(jsonStr).toDS())
  df.show(false)

Yields below output


+-----------+-----+-----------+-------+
|City       |State|ZipCodeType|Zipcode|
+-----------+-----+-----------+-------+
|PARC PARQUE|PR   |STANDARD   |704    |
+-----------+-----+-----------+-------+

Converting RDD[String] to JSON

spark.read.json() also has another deprecated function to convert RDD[String] which contains a JSON string to Spark DataFrame


  // from RDD[String]
  // deprecated
  val rdd = spark.sparkContext.parallelize(
    """ {"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"} """ :: Nil)
  val df2 = spark.read.json(rdd)
  df2.show()

This yields the same output as above.

Complete example of Parsing JSON from CSV file into DataFrame


package com.sparkbyexamples.spark.dataframe

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object ReadJsonFromString extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val schema = new StructType()
    .add("Zipcode", StringType, true)
    .add("ZipCodeType", StringType, true)
    .add("City", StringType, true)
    .add("State", StringType, true)

  //Read JSON string from CSV file
  val dfFromCSV:DataFrame = spark.read.option("header",true)
     .csv("src/main/resources/simple_zipcodes.csv")
  dfFromCSV.printSchema()
  dfFromCSV.show(false)

  val dfFromCSVJSON =  dfFromCSV.select(col("Id"),
    from_json(col("JsonValue"),schema).as("jsonData"))
      .select("Id","jsonData.*")
  dfFromCSVJSON.printSchema()
  dfFromCSVJSON.show(false)

  //Read json from string
  import spark.implicits._
  val jsonStr = """{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
  val df = spark.read.json(Seq(jsonStr).toDS())
  df.show(false)

  // from RDD[String]
  // deprecated
  val rdd = spark.sparkContext.parallelize(
    """ {"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"} """ :: Nil)
  val df2 = spark.read.json(rdd)
  df2.show()

}

This example is also available at GitHub project for reference.

Conclusion

In this Spark article, you have learned how to read and parse a JSON string from a CSV file and also learned how to convert JSON string columns into multiple columns on DataFrame using Scala examples.

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

Leave a Reply