In this article, you will learn how to use Spark SQL Join condition on multiple columns of DataFrame and Dataset with Scala example. Also, you will learn different ways to provide Join condition on two or more columns.
Before we jump into how to use multiple columns on Join expression, first, let’s create a DataFrames from emp
and dept
datasets, On these dept_id
and branch_id
columns are present on both datasets and we use these columns in Join expression while joining DataFrames.
val emp = Seq((1,"Smith",-1,"2018",10,"M",3000),
(2,"Rose",1,"2010",20,"M",4000),
(3,"Williams",1,"2010",10,"M",1000),
(4,"Jones",2,"2005",10,"F",2000),
(5,"Brown",2,"2010",30,"",-1),
(6,"Brown",2,"2010",50,"",-1)
)
val empColumns = Seq("emp_id","name","superior_emp_id","branch_id","dept_id",
"gender","salary")
import spark.sqlContext.implicits._
val empDF = emp.toDF(empColumns:_*)
empDF.show(false)
val dept = Seq(("Finance",10,"2018"),
("Marketing",20,"2010"),
("Marketing",20,"2018"),
("Sales",30,"2005"),
("Sales",30,"2010"),
("IT",50,"2010")
)
val deptColumns = Seq("dept_name","dept_id","branch_id")
val deptDF = dept.toDF(deptColumns:_*)
deptDF.show(false)
This example prints the below output.
+------+--------+---------------+---------+-------+------+------+
|emp_id|name |superior_emp_id|branch_id|dept_id|gender|salary|
+------+--------+---------------+---------+-------+------+------+
|1 |Smith |-1 |2018 |10 |M |3000 |
|2 |Rose |1 |2010 |20 |M |4000 |
|3 |Williams|1 |2010 |10 |M |1000 |
|4 |Jones |2 |2005 |10 |F |2000 |
|5 |Brown |2 |2010 |30 | |-1 |
|6 |Brown |2 |2010 |50 | |-1 |
+------+--------+---------------+---------+-------+------+------+
+---------+-------+---------+
|dept_name|dept_id|branch_id|
+---------+-------+---------+
|Finance |10 |2018 |
|Marketing|20 |2010 |
|Marketing|20 |2018 |
|Sales |30 |2005 |
|Sales |30 |2010 |
|IT |50 |2010 |
+---------+-------+---------+
Using Join syntax
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
This join syntax takes, takes right
dataset, joinExprs
and joinType
as arguments and we use joinExprs
to provide join condition on multiple columns.
//Using multiple columns on join expression
empDF.join(deptDF, empDF("dept_id") === deptDF("dept_id") &&
empDF("branch_id") === deptDF("branch_id"),"inner")
.show(false)
This example joins emptDF
DataFrame with deptDF
DataFrame on multiple columns dept_id
and branch_id
columns using an inner join. This example prints below output to console.
+------+-----+---------------+---------+-------+------+------+---------+-------+---------+
|emp_id|name |superior_emp_id|branch_id|dept_id|gender|salary|dept_name|dept_id|branch_id|
+------+-----+---------------+---------+-------+------+------+---------+-------+---------+
|1 |Smith|-1 |2018 |10 |M |3000 |Finance |10 |2018 |
|2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 |2010 |
|5 |Brown|2 |2010 |30 | |-1 |Sales |30 |2010 |
|6 |Brown|2 |2010 |50 | |-1 |IT |50 |2010 |
+------+-----+---------------+---------+-------+------+------+---------+-------+---------+
The rest of the article, provides a similar example using where()
, filter()
and spark.sql()
and all examples provides the same output as above.
Using Where to provide Join condition
Instead of using a join condition with join()
operator, we can use where()
to provide a join condition.
//Using Join with multiple columns on where clause
empDF.join(deptDF).where(empDF("dept_id") === deptDF("dept_id") &&
empDF("branch_id") === deptDF("branch_id"))
.show(false)
Using Filter to provide Join condition
We can also use filter()
to provide Spark Join condition, below example we have provided join with multiple columns.
//Using Join with multiple columns on filter clause
empDF.join(deptDF).filter(empDF("dept_id") === deptDF("dept_id") &&
empDF("branch_id") === deptDF("branch_id"))
.show(false)
Using Spark SQL Expression to provide Join condition
Here, we will use the native SQL syntax in Spark to join tables with a condition on multiple columns
//Using SQL & multiple columns on join expression
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
val resultDF = spark.sql("select e.* from EMP e, DEPT d " +
"where e.dept_id == d.dept_id and e.branch_id == d.branch_id")
resultDF.show(false)
Source code of using Spark SQL on Multiple columns
package com.sparkbyexamples.spark.dataframe.join
import org.apache.spark.sql.SparkSession
object JoinMultipleColumns extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val emp = Seq((1,"Smith",-1,"2018",10,"M",3000),
(2,"Rose",1,"2010",20,"M",4000),
(3,"Williams",1,"2010",10,"M",1000),
(4,"Jones",2,"2005",10,"F",2000),
(5,"Brown",2,"2010",30,"",-1),
(6,"Brown",2,"2010",50,"",-1)
)
val empColumns = Seq("emp_id","name","superior_emp_id","branch_id","dept_id","gender","salary")
import spark.sqlContext.implicits._
val empDF = emp.toDF(empColumns:_*)
empDF.show(false)
val dept = Seq(("Finance",10,"2018"),
("Marketing",20,"2010"),
("Marketing",20,"2018"),
("Sales",30,"2005"),
("Sales",30,"2010"),
("IT",50,"2010")
)
val deptColumns = Seq("dept_name","dept_id","branch_id")
val deptDF = dept.toDF(deptColumns:_*)
deptDF.show(false)
//Using multiple columns on join expression
empDF.join(deptDF, empDF("dept_id") === deptDF("dept_id") &&
empDF("branch_id") === deptDF("branch_id"),"inner")
.show(false)
//Using Join with multiple columns on where clause
empDF.join(deptDF).where(empDF("dept_id") === deptDF("dept_id") &&
empDF("branch_id") === deptDF("branch_id"))
.show(false)
//Using Join with multiple columns on filter clause
empDF.join(deptDF).filter(empDF("dept_id") === deptDF("dept_id") &&
empDF("branch_id") === deptDF("branch_id"))
.show(false)
//Using SQL & multiple columns on join expression
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
val resultDF = spark.sql("select e.* from EMP e, DEPT d " +
"where e.dept_id == d.dept_id and e.branch_id == d.branch_id")
resultDF.show(false)
}
The complete example is available at GitHub project for reference.
Conclusion
In this article, you have learned how to use Spark SQL Join on multiple DataFrame columns with Scala example and also learned how to use join conditions using Join, where, filter and SQL expression.
Thanks for reading. If you like it, please do share the article by following the below social links and any comments or suggestions are welcome in the comments sections!
Happy Learning !!