Spark – Using XStream API to write complex XML structures

When you have a need to write complex XML nested structures from Spark Data Frame and Databricks Spark-XML API is not suitable for your use case, you could use XStream API to convert data to XML string and write it to filesystem as a text file. Let’s see how to do this using an example.

If you are looking for Databricks Spark-XML API example, please follow below link.

Processing XML files in Spark using Databricks Spark-XML API

We will use XStream API which is well know processing framework to serialize objects to XML and back again.

<dependency>
     <groupId>com.thoughtworks.xstream</groupId>
     <artifactId>xstream</artifactId>
     <version>1.4.11</version>
</dependency>

Though the example we have used here is not complex and this could be easily done by using Spark-XML API, my thought process is to explain another approach where we get more flexible on creating complex XML structures using XStream API.

First, let’s create a DataFrame from data collection and set the schema. Our data set is a Person which also contains “name” as nested structure.


val data = Seq(Row("1",Row("James ","","Smith"),"36636","M","3000"),
      Row("2",Row("Michael ","Rose",""),"40288","M","4000"),
      Row("3",Row("Robert ","","Williams"),"42114","M","4000"),
      Row("4",Row("Maria ","Anne","Jones"),"39192","F","4000"),
      Row("5",Row("Jen","Mary","Brown"),"","F","-1")
    )
val schema = new StructType()
      .add("id",StringType)
      .add("name",new StructType()
        .add("firstName",StringType)
        .add("middleName",StringType)
        .add("lastName",StringType))
      .add("ssn",StringType)
      .add("gender",StringType)
      .add("salary",StringType)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

Create a Scala case class similar to the structure of your output XML.


case class Name(firstName:String,middleName:String,lastName:String)
case class Person(id:String,name:Name,ssn:String,gender:String,salary:String)

Now convert the DataFrame[Row] to DataSet[Person].


    import spark.implicits._
    val dsPerson = df.as[Person]

We will transform Person object to XML string using XStream API. Let’s use mapPartitions to apply this transformation. Using mapPartitions we will instantiate xStream class only once per partition. Do not use map or instantiate xStream inside a map of mapPartitions as it will instantiate for each record and this will have a performance impact due to expensive operation


val dsString = dsPerson.mapPartitions(partition=>{
      val xstream = new XStream(new DomDriver)
      val data = partition.map(person=>{
        val xmlString = xstream.toXML(person)
        xmlString
      })
      data
    })

Finally, write DataSet[String] as a text file.


dsString.write.text("c:/tmp/xstream.xml")

Here is a full example with XStream API.


package com.sparkbyexamples.spark.dataframe.xml.xstream

import com.thoughtworks.xstream.XStream
import com.thoughtworks.xstream.io.xml.DomDriver
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{Row, SparkSession}

case class Name(firstName:String,middleName:String,lastName:String)
case class Person(id:String,name:Name,ssn:String,gender:String,salary:String)

object WriteXML {
  def main(args: Array[String]): Unit = {

    val spark:SparkSession = SparkSession.builder()
      .master("local")
      .appName("SparkByExample")
      .getOrCreate()

    val sc = spark.sparkContext

    val data = Seq(Row("1",Row("James ","","Smith"),"36636","M","3000"),
      Row("2",Row("Michael ","Rose",""),"40288","M","4000"),
      Row("3",Row("Robert ","","Williams"),"42114","M","4000"),
      Row("4",Row("Maria ","Anne","Jones"),"39192","F","4000"),
      Row("5",Row("Jen","Mary","Brown"),"","F","-1")
    )

    val schema = new StructType()
      .add("id",StringType)
      .add("name",new StructType()
        .add("firstName",StringType)
        .add("middleName",StringType)
        .add("lastName",StringType))
      .add("ssn",StringType)
      .add("gender",StringType)
      .add("salary",StringType)

    val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

    import spark.implicits._
    val dsPerson = df.as[Person]

    val dsString = dsPerson.mapPartitions(partition=>{
      val xstream = new XStream(new DomDriver)
      val data = partition.map(person=>{
        val xmlString = xstream.toXML(person)
        xmlString
      })
      data
    })

    dsString.write.text("c:/tmp/xstream.xml")

  }
}

The complete code can be downloaded from GitHub

Conclusion:

In this article, we have seen how to use XStream API to create files with complex XML structures. Though the example we have used is not complex and this could be easily done by using Spark-XML API, I would like to explain another approach where we get more flexible on creating complex XML structures using XStream API and this flexible is not available with Spark-XML API

Hope you have enjoyed reading this article. If you like this please leave me a comment.

Happy Learning !!

spark xstream xml

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply

This Post Has One Comment

  1. Anonymous

    so helpfull article. Thank you.

You are currently viewing Spark – Using XStream API to write complex XML structures