Site icon Spark By {Examples}

Spark Union Tables From Different Hive Databases

spark union table database

In this Spark article, you will learn how to union two or more tables of the same schema which are from different Hive databases with Scala examples.

First, let’s create two tables with the same schema in different Hive databases. To create tables we need hive in this process, so add the required maven/Gradle dependency and enable hive support in spark configuration.

Maven Dependency

You can use supportive spark version, I have used 2.4.4


<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>${spark.version}</version>
</dependency>

Create SparkSession by Enabling Hive Support

Now add enableHiveSupport while creating SparkSession object, so that we can create Hive databases and tables using Spark.


val spark = SparkSession.builder
  .appName("my-app")
  .master("local[*]")
  .enableHiveSupport()
  .getOrCreate()

Create Databases and Tables with the Same schema

As our concept is to union tables of the same schema from different Hive databases, let’s create database1.table1 and database2.table2 by reading the same .csv file, so that schema is constant.

hive union two tables
Create table.

In the above code snippet, we are reading the CSV file into DataFrame and storing that DataFrame as a Hive table in two different databases.

Databases.

Now, we had produced table1 and table2 with same schema in different databases.

Union Tables From Different Database

Now try to union them write the result into a separate table.

Union tables

Dataframe union() – union() method of the DataFrame is used to combine two DataFrame’s of the same structure/schema. If schemas are not the same it returns an error.

What we did here is

Below is the complete example:


val spark = SparkSession.builder
  .appName("my-app")
  .master("local[*]")
  .enableHiveSupport()
  .getOrCreate()
//Reading csv file.
val fsprdDF = spark.read.option("header","true").option("inferschema", "true")
.csv("src/main/resources/Periods-2015-_2022.csv")
//Create database1 and table1
spark.sql("create database if not exists database1")
fsprdDF.write.mode(SaveMode.Overwrite).saveAsTable("database1.table1")
//Create database2 and table2
spark.sql("create database if not exists database2")
fsprdDF.write.mode(SaveMode.Overwrite).saveAsTable("database2.table2")
//UNION Operation
val unionTblist = List("database1.table1", "database2.table2")
var trgTable = spark.emptyDataFrame
var dbTbllist = Seq[DataFrame]()
for (tbl  1) {
   trgTable = dbTbllist.reduce(_ union _)
} else {
   trgTable = dbTbllist(0)
}
//Create database3.table1 with union of database1.table1 and database.table2
spark.sql("create database if not exists database3")
trgTable
.distinct
.write
.mode(SaveMode.Overwrite).saveAsTable("database3.table1")
}

That’s it!

Hope it helps. :)

keep learning and keep growing.

Related Article

Exit mobile version