Spark DataFrame supports all basic SQL Join Types like INNER
, LEFT OUTER
, RIGHT OUTER
, LEFT ANTI
, LEFT SEMI
, CROSS
, SELF
JOIN. Spark SQL Joins are wider transformations that result in data shuffling over the network hence they have huge performance issues when not designed with care.
Related: PySpark SQL Tutorial
On the other hand, Spark SQL Joins comes with more optimization by default (thanks to DataFrames & Dataset) however still there would be some performance issues to consider while using.
In this tutorial, you will learn different Join syntaxes and using different Join types on two DataFrames and Datasets using Scala examples. Please access Join on Multiple DataFrames in case you want to join more than two DataFrames.
- Join Syntax & Types
- Inner Join
- Full Outer Join
- Left Outer Join
- Right Outer Join
- Left Anti Join
- Left Semi Join
- Self Join
- Using SQL Expression
1. SQL Join Types & Syntax
Below is the list of all Spark SQL Join Types and Syntaxes.
1) join(right: Dataset[_]): DataFrame
2) join(right: Dataset[_], usingColumn: String): DataFrame
3) join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
4) join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
5) join(right: Dataset[_], joinExprs: Column): DataFrame
6) join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
The rest of the tutorial explains Join Types using syntax 6 which takes arguments right join DataFrame, join expression and type of join in String.
For Syntax 4 & 5 you can use either “JoinType” or “Join String” defined on the above table for “joinType” string argument. When you use “JoinType”, you should import org.apache.spark.sql.catalyst.plans._
as this package defines JoinType objects.
JoinType | Join String | Equivalent SQL Join |
---|---|---|
Inner.sql | inner | INNER JOIN |
FullOuter.sql | outer, full, fullouter, full_outer | FULL OUTER JOIN |
LeftOuter.sql | left, leftouter, left_outer | LEFT JOIN |
RightOuter.sql | right, rightouter, right_outer | RIGHT JOIN |
Cross.sql | cross | |
LeftAnti.sql | anti, leftanti, left_anti | |
LeftSemi.sql | semi, leftsemi, left_semi |
All Join objects are defined at joinTypes class, In order to use these you need to import org.apache.spark.sql.catalyst.plans.{LeftOuter,Inner,....}
.
Before we jump into Spark SQL Join examples, first, let’s create an emp
and dept
DataFrames. here, column emp_id
is unique on emp and dept_id
is unique on the dept datasets and emp_dept_id from emp has a reference to dept_id on dept dataset.
val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
(2,"Rose",1,"2010","20","M",4000),
(3,"Williams",1,"2010","10","M",1000),
(4,"Jones",2,"2005","10","F",2000),
(5,"Brown",2,"2010","40","",-1),
(6,"Brown",2,"2010","50","",-1)
)
val empColumns = Seq("emp_id","name","superior_emp_id","year_joined",
"emp_dept_id","gender","salary")
import spark.sqlContext.implicits._
val empDF = emp.toDF(empColumns:_*)
empDF.show(false)
val dept = Seq(("Finance",10),
("Marketing",20),
("Sales",30),
("IT",40)
)
val deptColumns = Seq("dept_name","dept_id")
val deptDF = dept.toDF(deptColumns:_*)
deptDF.show(false)
This prints “emp” and “dept” DataFrame to the console.
Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1 |Smith |-1 |2018 |10 |M |3000 |
|2 |Rose |1 |2010 |20 |M |4000 |
|3 |Williams|1 |2010 |10 |M |1000 |
|4 |Jones |2 |2005 |10 |F |2000 |
|5 |Brown |2 |2010 |40 | |-1 |
|6 |Brown |2 |2010 |50 | |-1 |
+------+--------+---------------+-----------+-----------+------+------+
Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance |10 |
|Marketing|20 |
|Sales |30 |
|IT |40 |
+---------+-------+
2. Inner Join
Spark Inner
join is the default join and it’s mostly used, It is used to join two DataFrames/Datasets on key columns, and where keys don’t match the rows get dropped from both datasets (emp
& dept
).
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner")
.show(false)
When we apply Inner join on our datasets, It drops “emp_dept_id
” 50 from “emp
” and “dept_id
” 30 from “dept
” datasets. Below is the result of the above Join expression.
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1 |Smith |-1 |2018 |10 |M |3000 |Finance |10 |
|2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 |
|3 |Williams|1 |2010 |10 |M |1000 |Finance |10 |
|4 |Jones |2 |2005 |10 |F |2000 |Finance |10 |
|5 |Brown |2 |2010 |40 | |-1 |IT |40 |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
3. Full Outer Join
Outer
a.k.a full
, fullouter
join returns all rows from both Spark DataFrame/Datasets, where join expression doesn’t match it returns null on respective record columns.
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"outer")
.show(false)
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"full")
.show(false)
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"fullouter")
.show(false)
From our “emp
” dataset’s “emp_dept_id
” with value 50 doesn’t have a record on “dept
” hence dept columns have null and “dept_id
” 30 doesn’t have a record in “emp
” hence you see null’s on emp columns. Below is the result of the above Join expression.
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 |
|5 |Brown |2 |2010 |40 | |-1 |IT |40 |
|1 |Smith |-1 |2018 |10 |M |3000 |Finance |10 |
|3 |Williams|1 |2010 |10 |M |1000 |Finance |10 |
|4 |Jones |2 |2005 |10 |F |2000 |Finance |10 |
|6 |Brown |2 |2010 |50 | |-1 |null |null |
|null |null |null |null |null |null |null |Sales |30 |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
4. Left Outer Join
Spark Left
a.k.a Left Outer
join returns all rows from the left DataFrame/Dataset regardless of match found on the right dataset when join expression doesn’t match, it assigns null for that record and drops records from right where match not found.
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"left")
.show(false)
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftouter")
.show(false)
From our dataset, “emp_dept_id
” 5o doesn’t have a record on “dept
” dataset hence, this record contains null on “dept
” columns (dept_name & dept_id). and “dept_id
” 30 from “dept
” dataset dropped from the results. Below is the result of the above Join expression.
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1 |Smith |-1 |2018 |10 |M |3000 |Finance |10 |
|2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 |
|3 |Williams|1 |2010 |10 |M |1000 |Finance |10 |
|4 |Jones |2 |2005 |10 |F |2000 |Finance |10 |
|5 |Brown |2 |2010 |40 | |-1 |IT |40 |
|6 |Brown |2 |2010 |50 | |-1 |null |null |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
5. Right Outer Join
Spark Right
a.k.a Right Outer
join is opposite of left
join, here it returns all rows from the right DataFrame/Dataset regardless of math found on the left dataset, when join expression doesn’t match, it assigns null for that record and drops records from left where match not found.
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"right")
.show(false)
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"rightouter")
.show(false)
From our example, the right dataset “dept_id
” 30 doesn’t have it on the left dataset “emp
” hence, this record contains null on “emp
” columns. and “emp_dept_id
” 50 dropped as a match not found on left. Below is the result of the above Join expression.
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4 |Jones |2 |2005 |10 |F |2000 |Finance |10 |
|3 |Williams|1 |2010 |10 |M |1000 |Finance |10 |
|1 |Smith |-1 |2018 |10 |M |3000 |Finance |10 |
|2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 |
|null |null |null |null |null |null |null |Sales |30 |
|5 |Brown |2 |2010 |40 | |-1 |IT |40 |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
6. Left Semi Join
Spark Left Semi
join is similar to inner
join difference being leftsemi
join returns all columns from the left DataFrame/Dataset and ignores all columns from the right dataset. In other words, this join returns columns from the only left dataset for the records match in the right dataset on join expression, records not matched on join expression are ignored from both left and right datasets.
The same result can be achieved using select on the result of the inner join however, using this join would be efficient.
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftsemi")
.show(false)
Below is the result of the above join expression.
leftsemi join
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1 |Smith |-1 |2018 |10 |M |3000 |
|2 |Rose |1 |2010 |20 |M |4000 |
|3 |Williams|1 |2010 |10 |M |1000 |
|4 |Jones |2 |2005 |10 |F |2000 |
|5 |Brown |2 |2010 |40 | |-1 |
+------+--------+---------------+-----------+-----------+------+------+
7. Left Anti Join
Left Anti
join does the exact opposite of the Spark leftsemi
join, leftanti
join returns only columns from the left DataFrame/Dataset for non-matched records.
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftanti")
.show(false)
Yields below output
+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6 |Brown|2 |2010 |50 | |-1 |
+------+-----+---------------+-----------+-----------+------+------+
8. Self Join
Spark Joins are not complete without a self join, Though there is no self-join type available, we can use any of the above-explained join types to join DataFrame to itself. below example use inner
self join
empDF.as("emp1").join(empDF.as("emp2"),
col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
.select(col("emp1.emp_id"),col("emp1.name"),
col("emp2.emp_id").as("superior_emp_id"),
col("emp2.name").as("superior_emp_name"))
.show(false)
Here, we are joining emp
dataset with itself to find out superior emp_id
and name
for all employees.
+------+--------+---------------+-----------------+
|emp_id|name |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2 |Rose |1 |Smith |
|3 |Williams|1 |Smith |
|4 |Jones |2 |Rose |
|5 |Brown |2 |Rose |
|6 |Brown |2 |Rose |
+------+--------+---------------+-----------------+
9. Using SQL Expression
Since Spark SQL supports native SQL syntax, we can also write join operations after creating temporary tables on DataFrame’s and using spark.sql()
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
//SQL JOIN
val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id")
joinDF.show(false)
val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
joinDF2.show(false)
10. Source Code | Scala Example
package com.sparkbyexamples.spark.dataframe.join
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
object JoinExample extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
(2,"Rose",1,"2010","20","M",4000),
(3,"Williams",1,"2010","10","M",1000),
(4,"Jones",2,"2005","10","F",2000),
(5,"Brown",2,"2010","40","",-1),
(6,"Brown",2,"2010","50","",-1)
)
val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary")
import spark.sqlContext.implicits._
val empDF = emp.toDF(empColumns:_*)
empDF.show(false)
val dept = Seq(("Finance",10),
("Marketing",20),
("Sales",30),
("IT",40)
)
val deptColumns = Seq("dept_name","dept_id")
val deptDF = dept.toDF(deptColumns:_*)
deptDF.show(false)
println("Inner join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner")
.show(false)
println("Outer join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"outer")
.show(false)
println("full join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"full")
.show(false)
println("fullouter join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"fullouter")
.show(false)
println("right join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"right")
.show(false)
println("rightouter join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"rightouter")
.show(false)
println("left join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"left")
.show(false)
println("leftouter join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftouter")
.show(false)
println("leftanti join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftanti")
.show(false)
println("leftsemi join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftsemi")
.show(false)
println("cross join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"cross")
.show(false)
println("Using crossJoin()")
empDF.crossJoin(deptDF).show(false)
println("self join")
empDF.as("emp1").join(empDF.as("emp2"),
col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
.select(col("emp1.emp_id"),col("emp1.name"),
col("emp2.emp_id").as("superior_emp_id"),
col("emp2.name").as("superior_emp_name"))
.show(false)
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
//SQL JOIN
val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id")
joinDF.show(false)
val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
joinDF2.show(false)
}
Examples explained here are available at the GitHub project for reference.
Conclusion
In this tutorial, you have learned Spark SQL Join types INNER
, LEFT OUTER
, RIGHT OUTER
, LEFT ANTI
, LEFT SEMI
, CROSS
, SELF
joins usage, and examples with Scala.
Happy Learning !!
Related Articles
- Spark SQL Inner Join Explained
- Spark SQL Right Outer Join with Example
- Spark SQL Self Join With Example
- Spark SQL Left Anti Join with Example
- Spark SQL Left Outer Join with Example
- Spark SQL Left Semi Join Example
- Spark Most Used JSON Functions with Examples
- Spark Shell Command Usage with Examples
May I know what version of Spark are you using?
Inner join section – When we apply Inner join on our datasets, It drops “emp_dept_id” 60 from —
it should be 50 not 60
|6 |Brown |2 |2010 |50 | |-1 |
Thanks, Sunilbhola for correcting it. It’s a typo and has fixed now.
Very nice tutorials and thank you very much for the content but this is not applicable to multiple dataframes JOIN. It works only for two dataframes.
Hi Vaggelis, Thanks for your comments. Agree with you. I have another article Spark SQL Join Multiple DataFrames, please check.
very informative