You are currently viewing Spark – Using XStream API to write complex XML structures

When you have a need to write complex XML nested structures from Spark Data Frame and Databricks Spark-XML API is not suitable for your use case, you could use XStream API to convert data to XML string and write it to filesystem as a text file. Let’s see how to do this using an example.

If you are looking for Databricks Spark-XML API example, please follow below link.

Processing XML files in Spark using Databricks Spark-XML API

We will use XStream API which is well know processing framework to serialize objects to XML and back again.


<dependency>
     <groupId>com.thoughtworks.xstream</groupId>
     <artifactId>xstream</artifactId>
     <version>1.4.11</version>
</dependency>

Though the example we have used here is not complex and this could be easily done by using Spark-XML API, my thought process is to explain another approach where we get more flexible on creating complex XML structures using XStream API.

First, let’s create a DataFrame from data collection and set the schema. Our data set is a Person which also contains “name” as nested structure.


val data = Seq(Row("1",Row("James ","","Smith"),"36636","M","3000"),
      Row("2",Row("Michael ","Rose",""),"40288","M","4000"),
      Row("3",Row("Robert ","","Williams"),"42114","M","4000"),
      Row("4",Row("Maria ","Anne","Jones"),"39192","F","4000"),
      Row("5",Row("Jen","Mary","Brown"),"","F","-1")
    )
val schema = new StructType()
      .add("id",StringType)
      .add("name",new StructType()
        .add("firstName",StringType)
        .add("middleName",StringType)
        .add("lastName",StringType))
      .add("ssn",StringType)
      .add("gender",StringType)
      .add("salary",StringType)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

Create a Scala case class similar to the structure of your output XML.


case class Name(firstName:String,middleName:String,lastName:String)
case class Person(id:String,name:Name,ssn:String,gender:String,salary:String)

Now convert the DataFrame[Row] to DataSet[Person].


    import spark.implicits._
    val dsPerson = df.as[Person]

We will transform Person object to XML string using XStream API. Let’s use mapPartitions to apply this transformation. Using mapPartitions we will instantiate xStream class only once per partition. Do not use map or instantiate xStream inside a map of mapPartitions as it will instantiate for each record and this will have a performance impact due to expensive operation


val dsString = dsPerson.mapPartitions(partition=>{
      val xstream = new XStream(new DomDriver)
      val data = partition.map(person=>{
        val xmlString = xstream.toXML(person)
        xmlString
      })
      data
    })

Finally, write DataSet[String] as a text file.


dsString.write.text("c:/tmp/xstream.xml")

Here is a full example with XStream API.


package com.sparkbyexamples.spark.dataframe.xml.xstream

import com.thoughtworks.xstream.XStream
import com.thoughtworks.xstream.io.xml.DomDriver
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{Row, SparkSession}

case class Name(firstName:String,middleName:String,lastName:String)
case class Person(id:String,name:Name,ssn:String,gender:String,salary:String)

object WriteXML {
  def main(args: Array[String]): Unit = {

    val spark:SparkSession = SparkSession.builder()
      .master("local")
      .appName("SparkByExample")
      .getOrCreate()

    val sc = spark.sparkContext

    val data = Seq(Row("1",Row("James ","","Smith"),"36636","M","3000"),
      Row("2",Row("Michael ","Rose",""),"40288","M","4000"),
      Row("3",Row("Robert ","","Williams"),"42114","M","4000"),
      Row("4",Row("Maria ","Anne","Jones"),"39192","F","4000"),
      Row("5",Row("Jen","Mary","Brown"),"","F","-1")
    )

    val schema = new StructType()
      .add("id",StringType)
      .add("name",new StructType()
        .add("firstName",StringType)
        .add("middleName",StringType)
        .add("lastName",StringType))
      .add("ssn",StringType)
      .add("gender",StringType)
      .add("salary",StringType)

    val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

    import spark.implicits._
    val dsPerson = df.as[Person]

    val dsString = dsPerson.mapPartitions(partition=>{
      val xstream = new XStream(new DomDriver)
      val data = partition.map(person=>{
        val xmlString = xstream.toXML(person)
        xmlString
      })
      data
    })

    dsString.write.text("c:/tmp/xstream.xml")

  }
}

The complete code can be downloaded from GitHub

Conclusion:

In this article, we have seen how to use XStream API to create files with complex XML structures. Though the example we have used is not complex and this could be easily done by using Spark-XML API, I would like to explain another approach where we get more flexible on creating complex XML structures using XStream API and this flexible is not available with Spark-XML API

Hope you have enjoyed reading this article. If you like this please leave me a comment.

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

This Post Has One Comment

  1. Anonymous

    so helpfull article. Thank you.

Comments are closed.