In this Spark article, you will learn how to parse or read a JSON string from a TEXT/CSV file and convert it into multiple DataFrame columns using Scala examples.
Assume you have a text file with a JSON data or a CSV file with a JSON string in a column, In order to read these files and parse JSON and convert to DataFrame, we use from_json()
function provided in Spark SQL.
1. Read and Parse a JSON from a TEXT file
In this section, we will see parsing a JSON string from a text file and convert it to Spark DataFrame columns using from_json()
Spark SQL built-in function.
Below is a JSON data present in a text file,
{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}
{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PASEO COSTA DEL SUR","State":"PR"}
{"Zipcode":709,"ZipCodeType":"STANDARD","City":"BDA SAN LUIS","State":"PR"}
{"Zipcode":76166,"ZipCodeType":"UNIQUE","City":"CINGULAR WIRELESS","State":"TX"}
{"Zipcode":76177,"ZipCodeType":"STANDARD","City":"FORT WORTH","State":"TX"}
{"Zipcode":76177,"ZipCodeType":"STANDARD","City":"FT WORTH","State":"TX"}
{"Zipcode":704,"ZipCodeType":"STANDARD","City":"URB EUGENE RICE","State":"PR"}
{"Zipcode":85209,"ZipCodeType":"STANDARD","City":"MESA","State":"AZ"}
{"Zipcode":85210,"ZipCodeType":"STANDARD","City":"MESA","State":"AZ"}
{"Zipcode":32046,"ZipCodeType":"STANDARD","City":"HILLIARD","State":"FL"}
we can easily read this file with a read.json() method, however, we ignore this and read it as a text file in order to explain from_json() function usage.
//Read JSON string from text file
val dfFromText:DataFrame = spark.read.text("src/main/resources/simple_zipcodes.txt")
dfFromText.printSchema()
This read the JSON string from a text file into a DataFrame value
column as shown in below schema.
root
|-- value: string (nullable = true)
2. Convert JSON column to Multiple Columns
Now, let’s convert the value column into multiple columns using from_json(), This function takes the DataFrame column with JSON string and JSON schema as arguments. so, first, let’s create a schema that represents our data.
//Define schema of JSON structure
import org.apache.spark.sql.types.{StringType, StructType}
val schema = new StructType()
.add("Zipcode", StringType, true)
.add("ZipCodeType", StringType, true)
.add("City", StringType, true)
.add("State", StringType, true)
Finally, use from_json()
function which returns the Column Struct with all JSON columns and we explode the struct to flatten it.
// convert json column to multiple columns
import org.apache.spark.sql.functions.{col,from_json}
val dfJSON = dfFromText.withColumn("jsonData",from_json(col("value"),schema))
.select("jsonData.*")
dfJSON.printSchema()
dfJSON.show(false)
Yields below output
root
|-- Zipcode: string (nullable = true)
|-- ZipCodeType: string (nullable = true)
|-- City: string (nullable = true)
|-- State: string (nullable = true)
+-------+-----------+-------------------+-----+
|Zipcode|ZipCodeType|City |State|
+-------+-----------+-------------------+-----+
|704 |STANDARD |PARC PARQUE |PR |
|704 |STANDARD |PASEO COSTA DEL SUR|PR |
|709 |STANDARD |BDA SAN LUIS |PR |
|76166 |UNIQUE |CINGULAR WIRELESS |TX |
|76177 |STANDARD |FORT WORTH |TX |
|76177 |STANDARD |FT WORTH |TX |
|704 |STANDARD |URB EUGENE RICE |PR |
|85209 |STANDARD |MESA |AZ |
|85210 |STANDARD |MESA |AZ |
|32046 |STANDARD |HILLIARD |FL |
+-------+-----------+-------------------+-----+
Alternatively, you can also write the above statement using select.
//alternatively using select
val df5 = dfFromText.select(from_json(col("value"), schema).as("data"))
.select("data.*").show(false)
3. Read and Parse a JSON from CSV column string
Similarly, we can also parse JSON from a CSV file and create a DataFrame.
//Read from CSV file
val dfFromCSV:DataFrame = spark.read.option("header",true)
.csv("src/main/resources/simple_zipcodes.csv")
dfFromCSV.printSchema()
dfFromCSV.show(false)
This loads the entire JSON string into column JsonValue
and yields below schema.
root
|-- Id: string (nullable = true)
|-- JsonValue: string (nullable = true)
Now, Let’s parse column JsonValue
and convert it to multiple columns by using from_json()
function.
//Conver JsonValue to multiple columns
val dfFromCSVJSON = dfFromCSV.select(col("Id"),
from_json(col("JsonValue"),schema).as("jsonData"))
.select("Id","jsonData.*")
dfFromCSVJSON.printSchema()
dfFromCSVJSON.show(false)
Yields below output
root
|-- Id: string (nullable = true)
|-- Zipcode: string (nullable = true)
|-- ZipCodeType: string (nullable = true)
|-- City: string (nullable = true)
|-- State: string (nullable = true)
+---+-------+-----------+-------------------+-----+
|Id |Zipcode|ZipCodeType|City |State|
+---+-------+-----------+-------------------+-----+
|1 |704 |STANDARD |PARC PARQUE |PR |
|2 |704 |STANDARD |PASEO COSTA DEL SUR|PR |
|3 |709 |STANDARD |BDA SAN LUIS |PR |
|4 |76166 |UNIQUE |CINGULAR WIRELESS |TX |
|5 |76177 |STANDARD |FORT WORTH |TX |
|6 |76177 |STANDARD |FT WORTH |TX |
|7 |704 |STANDARD |URB EUGENE RICE |PR |
|8 |85209 |STANDARD |MESA |AZ |
|9 |85210 |STANDARD |MESA |AZ |
|10 |32046 |STANDARD |HILLIARD |FL |
+---+-------+-----------+-------------------+-----+
4. Convert JSON String to DataFrame Columns
When you have a JSON in a string and wanted to convert or load to Spark DataFrame, use spark.read.json()
, this function takes Dataset[String] as an argument.
//Read json from string
import spark.implicits._
val jsonStr = """{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
val df = spark.read.json(Seq(jsonStr).toDS())
df.show(false)
Yields below output
+-----------+-----+-----------+-------+
|City |State|ZipCodeType|Zipcode|
+-----------+-----+-----------+-------+
|PARC PARQUE|PR |STANDARD |704 |
+-----------+-----+-----------+-------+
5. Converting RDD[String] to JSON
spark.read.json()
also has another deprecated function to convert RDD[String] which contains a JSON string to Spark DataFrame
// from RDD[String]
// deprecated
val rdd = spark.sparkContext.parallelize(
""" {"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"} """ :: Nil)
val df2 = spark.read.json(rdd)
df2.show()
This yields the same output as above.
6. Complete example of Parsing JSON from String into DataFrame
package com.sparkbyexamples.spark.dataframe
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object ReadJsonFromString extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
//Read JSON string from text file
val dfFromText:DataFrame = spark.read.text("src/main/resources/simple_zipcodes.txt")
dfFromText.printSchema()
//Define schema of JSON structure
val schema = new StructType()
.add("Zipcode", StringType, true)
.add("ZipCodeType", StringType, true)
.add("City", StringType, true)
.add("State", StringType, true)
//Convert JSON column to multiple columns
val dfJSON = dfFromText.withColumn("jsonData",from_json(col("value"),schema))
.select("jsonData.*")
dfJSON.printSchema()
dfJSON.show(false)
//Alternatively using select
val dfJSON2 = dfFromText.select(from_json(col("value"), schema).as("jsonData"))
.select("jsonData.*")
//Read JSON string from CSV file
val dfFromCSV:DataFrame = spark.read.option("header",true)
.csv("src/main/resources/simple_zipcodes.csv")
dfFromCSV.printSchema()
dfFromCSV.show(false)
val dfFromCSVJSON = dfFromCSV.select(col("Id"),
from_json(col("JsonValue"),schema).as("jsonData"))
.select("Id","jsonData.*")
dfFromCSVJSON.printSchema()
dfFromCSVJSON.show(false)
//Read json from string
import spark.implicits._
val jsonStr = """{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
val df = spark.read.json(Seq(jsonStr).toDS())
df.show(false)
// from RDD[String]
// deprecated
val rdd = spark.sparkContext.parallelize(
""" {"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"} """ :: Nil)
val df2 = spark.read.json(rdd)
df2.show()
}
This example is also available at GitHub project for reference.
7. Conclusion
In this Spark article, you have learned how to read and parse a JSON string from a text and CSV files and also learned how to convert JSON string columns into multiple columns on DataFrame using Scala examples.
This is wonderful.. i have a scenario where in i have 2 set of information embedded as a single row which i wanted to split and read it as 2 row instead of 1. can you please help me with this
Example
val jsonStr = “””[{“Zipcode”:704,”ZipCodeType”:”STANDARD”,”City”:”PARC PARQUE”,”State”:”PR”},{Zipcode”:704,”ZipCodeType”:”STANDARD”,”City”:”PARC PARQUE”,”State”:”PR”}]”””
Hi Saurav, Thanks for reading and happy it helped you. For nulls, wondering if your file has an empty line or the data in the file might not in right format.
This is awesome post, really helped me fixing my problem. However in my case it makes the first line as NULL, where as I dont see such thing in my text file. DO you think I am missing something?
+————-+——————–+——————–+——————–+———–+
|applicationId| transactionId| correlationId| message|messageType|
+————-+——————–+——————–+——————–+———–+
| null| null| null| null| null|
| RXLINK|RXLINK-1589386038…| BenefitInquiry-aarp|Member found, tar…| 200|