Spark SQL provides a set of JSON functions to parse JSON string, query to extract specific values from JSON. In this article, I will explain the most used JSON functions with Scala examples.
1. Spark JSON Functions
from_json() – Converts JSON string into Struct type or Map type.
to_json() – Converts MapType or Struct type to JSON string.
json_tuple() – Extract the Data from JSON and create them as a new columns.
get_json_object() – Extracts JSON element from a JSON string based on json path specified.
schema_of_json() – Create schema string from JSON string
2. Create DataFrame with Column contains JSON String
In order to explain these functions first, let’s create DataFrame with a column contains JSON string.
val jsonString="""{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
val data = Seq((1, jsonString))
import spark.implicits._
val df=data.toDF("id","value")
df.show(false)
//+---+--------------------------------------------------------------------------+
//|id |value |
//+---+--------------------------------------------------------------------------+
//|1 |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
//+---+--------------------------------------------------------------------------+
3. from_json() – Converts JSON string into Struct type or Map type
The below example converts JSON string to Map key-value pair. I will leave it to you to convert to struct type. Refer, Convert JSON string to Struct type column.
import org.apache.spark.sql.functions.{from_json,col}
import org.apache.spark.sql.types.{MapType, StringType}
val df2=df.withColumn("value",from_json(col("value"),MapType(StringType,StringType)))
df2.printSchema()
df2.show(false)
//root
// |-- id: integer (nullable = false)
// |-- value: map (nullable = true)
// | |-- key: string
// | |-- value: string (valueContainsNull = true)
//+---+---------------------------------------------------------------------------+
//|id |value |
//+---+---------------------------------------------------------------------------+
//|1 |[Zipcode -> 704, ZipCodeType -> STANDARD, City -> PARC PARQUE, State -> PR]|
//+---+---------------------------------------------------------------------------+
4. to_json() – Converts MapType or Struct type to JSON string
Here, I am using df2 that created from above from_json()
example.
import org.apache.spark.sql.functions.{to_json}
df2.withColumn("value",to_json(col("value")))
.show(false)
//+---+----------------------------------------------------------------------------+
//|id |value |
//+---+----------------------------------------------------------------------------+
//|1 |{"Zipcode":"704","ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
//+---+----------------------------------------------------------------------------+
5. json_tuple() – Extract the Data from JSON and create them as new columns
import org.apache.spark.sql.functions.{json_tuple}
df.select(col("id"),json_tuple(col("value"),"Zipcode","ZipCodeType","City"))
.toDF("id","Zipcode","ZipCodeType","City")
.show(false)
//+---+-------+-----------+-----------+
//|id |Zipcode|ZipCodeType|City |
//+---+-------+-----------+-----------+
//|1 |704 |STANDARD |PARC PARQUE|
//+---+-------+-----------+-----------+
6. get_json_object() – Extracts JSON element from a JSON string based on json path specified
import org.apache.spark.sql.functions.{get_json_object}
df.select(col("id"),get_json_object(col("value"),"$.ZipCodeType").as("ZipCodeType"))
.show(false)
//+---+-----------+
//|id |ZipCodeType|
//+---+-----------+
//|1 |STANDARD |
//+---+-----------+
7. schema_of_json() – Create schema string from JSON string.
import org.apache.spark.sql.functions.{schema_of_json,lit}
val schemaStr=spark.range(1)
.select(schema_of_json(lit("""{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}""")))
.collect()(0)(0)
println(schemaStr)
//struct<City:string,State:string,ZipCodeType:string,Zipcode:bigint>
8. Complete Example
import org.apache.spark.sql.SparkSession
object JsonFunctions extends App{
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val jsonString="""{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
val data = Seq((1, jsonString))
import spark.implicits._
val df=data.toDF("id","value")
df.show(false)
import org.apache.spark.sql.functions.{from_json,col}
import org.apache.spark.sql.types.{MapType, StringType}
val df2=df.withColumn("value",from_json(col("value"),MapType(StringType,StringType)))
df2.printSchema()
df2.show(false)
import org.apache.spark.sql.functions.{to_json}
df2.withColumn("value",to_json(col("value")))
.show(false)
import org.apache.spark.sql.functions.{json_tuple}
df.select(col("id"),json_tuple(col("value"),"Zipcode","ZipCodeType","City"))
.toDF("id","Zipcode","ZipCodeType","City")
.show(false)
import org.apache.spark.sql.functions.{get_json_object}
df.select(col("id"),get_json_object(col("value"),"$.ZipCodeType").as("ZipCodeType"))
.show(false)
import org.apache.spark.sql.functions.{schema_of_json,lit}
val schemaStr=spark.range(1)
.select(schema_of_json(lit("""{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}""")))
.collect()(0)(0)
println(schemaStr)
}
Related Articles
- Spark from_json() – Convert JSON Column to Struct, Map or Multiple Columns
- Spark Timestamp – Extract hour, minute and second
- Spark Convert JSON to Avro, CSV & Parquet
- Spark Convert Parquet file to JSON
- Spark Read Json From Amazon S3
- Spark Read Files from HDFS (TXT, CSV, AVRO, PARQUET, JSON)
- Spark Read JSON from multiline
- Spark Read JSON from a CSV file