Spark Merge Two DataFrames with Different Columns or Schema

In Spark or PySpark let’s see how to merge/union two DataFrames with a different number of columns (different schema). In Spark 3.1, you can easily achieve this using unionByName() transformation by passing allowMissingColumns with the value true. In older versions, this property is not available


//Scala
merged_df = df1.unionByName(df2, true)

#PySpark
merged_df = df1.unionByName(df2, allowMissingColumns=True)

The difference between unionByName() function and union() is that this function
resolves columns by name (not by position). In other words, unionByName() is used to merge two DataFrame’s by column names instead of by position.

In case if you are using older than Spark 3.1 version, use below approach to merge DataFrame’s with different column names.

Spark Merge Two DataFrames with Different Columns

In this section I will cover Spark with Scala example of how to merge two different DataFrames, first let’s create DataFrames with different number of columns. DataFrame df1 missing column state and salary and df2 missing column age.


//Create DataFrame df1 with columns name,dept & age
val data = Seq(("James","Sales",34), ("Michael","Sales",56),
               ("Robert","Sales",30), ("Maria","Finance",24) )
import spark.implicits._
val df1 = data.toDF("name","dept","age")
df1.printSchema()

//root
// |-- name: string (nullable = true)
// |-- dept: string (nullable = true)
// |-- age: long (nullable = true)

Second DataFrame


//Create DataFrame df1 with columns name,dep,state & salary
val data2=Seq(("James","Sales","NY",9000),("Maria","Finance","CA",9000),
              ("Jen","Finance","NY",7900),("Jeff","Marketing","CA",8000))
val df2 = data2.toDF("name","dept","state","salary")
df2.printSchema()

//root
// |-- name: string (nullable = true)
// |-- dept: string (nullable = true)
// |-- state: string (nullable = true)
// |-- salary: long (nullable = true)

Now create a new DataFrames from existing after adding missing columns. newly added columns contains null values and we add constant column using lit() function.


val merged_cols = df1.columns.toSet ++ df2.columns.toSet
import org.apache.spark.sql.functions.{col,lit}
def getNewColumns(column: Set[String], merged_cols: Set[String]) = {
    merged_cols.toList.map(x => x match {
      case x if column.contains(x) => col(x)
      case _ => lit(null).as(x)
    })
}
val new_df1=df1.select(getNewColumns(df1.columns.toSet, merged_cols):_*)
val new_df2=df2.select(getNewColumns(df2.columns.toSet, merged_cols):_*)

Finally merge two DataFrame’s by using column names


//Finally join two dataframe's df1 & df2 by name
val merged_df=new_df1.unionByName(new_df2)
merged_df.show()

//+-------+---------+----+-----+------+
//|   name|     dept| age|state|salary|
//+-------+---------+----+-----+------+
//|  James|    Sales|  34| null|  null|
//|Michael|    Sales|  56| null|  null|
//| Robert|    Sales|  30| null|  null|
//|  Maria|  Finance|  24| null|  null|
//|  James|    Sales|null|   NY|  9000|
//|  Maria|  Finance|null|   CA|  9000|
//|    Jen|  Finance|null|   NY|  7900|
//|   Jeff|Marketing|null|   CA|  8000|
//+-------+---------+----+-----+------+

PySpark Merge Two DataFrames with Different Columns

In PySpark to merge two DataFrames with different columns, will use the similar approach explain above and uses unionByName() transformation. First let’s create DataFrame’s with different number of columns.


from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

#Create DataFrame df1 with columns name,dept & age
data = [("James","Sales",34), ("Michael","Sales",56), \
    ("Robert","Sales",30), ("Maria","Finance",24) ]
columns= ["name","dept","age"]
df1 = spark.createDataFrame(data = data, schema = columns)
df1.printSchema()

#Create DataFrame df1 with columns name,dep,state & salary
data2=[("James","Sales","NY",9000),("Maria","Finance","CA",9000), \
    ("Jen","Finance","NY",7900),("Jeff","Marketing","CA",8000)]
columns2= ["name","dept","state","salary"]
df2 = spark.createDataFrame(data = data2, schema = columns2)
df2.printSchema()

Now add missing columns ‘state‘ and ‘salary‘ to df1 and ‘age‘ to df2 with null values.


#Add missing columns 'state' & 'salary' to df1
from pyspark.sql.functions import lit
for column in [column for column in df2.columns if column not in df1.columns]:
    df1 = df1.withColumn(column, lit(None))

#Add missing column 'age' to df2
for column in [column for column in df1.columns if column not in df2.columns]:
    df2 = df2.withColumn(column, lit(None))

Now merge/union the DataFrames using unionByName(). The difference between unionByName() function and union() is that this function
resolves columns by name (not by position). In other words, unionByName() is used to merge two DataFrame’s by column names instead of by position.


#Finally join two dataframe's df1 & df2 by name
merged_df=df1.unionByName(df2)
merged_df.show()

Conclusion

In this article, you have learned with spark & PySpark examples of how to merge two DataFrames with different columns can be done by adding missing columns to the DataFrame’s and finally union them using unionByName().

Happy Learning !!

Reference

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply

This Post Has One Comment

  1. Anonymous

    THANK YOU!!!!