Similar to SQL, Spark also supports Inner join to join two DataFrame tables, In this article, you will learn how to use an Inner Join on DataFrame with Scala example. Also, you will learn different ways to provide Join condition.
Inner
join is the default join in Spark and it’s mostly used, this joins two datasets on key columns and where keys don’t match the rows get dropped from both datasets.
Before we jump into Spark SQL Join examples, first, let’s create an "emp"
and "dept"
DataFrame’s. here, column "emp_id"
is unique on emp and "dept_id"
is unique on the dept dataset’s and emp_dept_id from emp has a reference to dept_id on dept dataset.
val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
(2,"Rose",1,"2010","20","M",4000),
(3,"Williams",1,"2010","10","M",1000),
(4,"Jones",2,"2005","10","F",2000),
(5,"Brown",2,"2010","40","",-1),
(6,"Brown",2,"2010","50","",-1)
)
val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","emp_dept_id",
"gender","salary")
import spark.sqlContext.implicits._
val empDF = emp.toDF(empColumns:_*)
empDF.show(false)
val dept = Seq(("Finance",10),
("Marketing",20),
("Sales",30),
("IT",40)
)
val deptColumns = Seq("dept_name","dept_id")
val deptDF = dept.toDF(deptColumns:_*)
deptDF.show(false)
This example prints the below output.
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1 |Smith |-1 |2018 |10 |M |3000 |
|2 |Rose |1 |2010 |20 |M |4000 |
|3 |Williams|1 |2010 |10 |M |1000 |
|4 |Jones |2 |2005 |10 |F |2000 |
|5 |Brown |2 |2010 |40 | |-1 |
|6 |Brown |2 |2010 |50 | |-1 |
+------+--------+---------------+-----------+-----------+------+------+
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance |10 |
|Marketing|20 |
|Sales |30 |
|IT |40 |
+---------+-------+
Using Join operator
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
join(right: Dataset[_], joinExprs: Column): DataFrame
The first join syntax takes, takes right
dataset, joinExprs
and joinType
as arguments and we use joinExprs
to provide a join condition. second join syntax takes just dataset and joinExprs and it considers default join as inner
. Below are examples of both syntaxes and provides the same output.
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner")
.show(false)
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"))
.show(false)
When we apply Inner join on our datasets, It drops “emp_dept_id
” 60 from “emp
” and “dept_id
” 30 from “dept
” datasets. Below is the result of the above Join expressions.
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1 |Smith |-1 |2018 |10 |M |3000 |Finance |10 |
|2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 |
|3 |Williams|1 |2010 |10 |M |1000 |Finance |10 |
|4 |Jones |2 |2005 |10 |F |2000 |Finance |10 |
|5 |Brown |2 |2010 |40 | |-1 |IT |40 |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
Alternatively, you can also use Inner.sql
as jointype and to use this you should import import org.apache.spark.sql.catalyst.plans.Inner
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),Inner.sql)
.show(false)
The rest of the article provides a spark SQL Inner Join example using DataFrame where()
, filter()
operators and spark.sql()
, all these examples provides the same output as above.
Using Where to provide Join condition
Instead of using a join condition with join()
operator, here, we use where()
to provide an inner join condition.
empDF.join(deptDF).where(empDF("emp_dept_id") === deptDF("dept_id"))
.show(false)
Using Filter to provide Join condition
We can also use filter()
to provide join condition for Spark Join operations
empDF.join(deptDF).filter(empDF("emp_dept_id") === deptDF("dept_id"))
.show(false)
Using Spark SQL Expression for Inner Join
Here, we will use the native SQL syntax in Spark to do Inner join. In order to use Native SQL syntax, first, we should create a temporary view and then use spark.sql()
to execute the SQL expression.
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
joinDF2.show(false)
Source code of using Spark DataFrame Inner Join
package com.sparkbyexamples.spark.dataframe.join
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.Inner
object InnerJoinExample extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
(2,"Rose",1,"2010","20","M",4000),
(3,"Williams",1,"2010","10","M",1000),
(4,"Jones",2,"2005","10","F",2000),
(5,"Brown",2,"2010","40","",-1),
(6,"Brown",2,"2010","50","",-1)
)
val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary")
import spark.sqlContext.implicits._
val empDF = emp.toDF(empColumns:_*)
empDF.show(false)
val dept = Seq(("Finance",10),
("Marketing",20),
("Sales",30),
("IT",40)
)
val deptColumns = Seq("dept_name","dept_id")
val deptDF = dept.toDF(deptColumns:_*)
deptDF.show(false)
println("Inner join")
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner")
.show(false)
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"))
.show(false)
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),Inner.sql)
.show(false)
empDF.join(deptDF).where(empDF("emp_dept_id") === deptDF("dept_id"))
.show(false)
empDF.join(deptDF).filter(empDF("emp_dept_id") === deptDF("dept_id"))
.show(false)
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
joinDF2.show(false)
}
The complete example is available at GitHub project for reference.
Conclusion
In this article, you have learned how to use Spark SQL Inner Join using DataFrame with Scala example and also learned how to use conditions using Join, where, filter and SQL expression.
Thanks for reading. If you like it, please do share the article by following the below social links and any comments or suggestions are welcome in the comments sections!
Happy Learning !!
Related Articles
- Spark Shell Command Usage with Examples
- Spark SQL like() Using Wildcard Example
- Spark SQL – Select Columns From DataFrame
- Spark SQL Self Join With Example
- Spark SQL Left Outer Join with Example
- Spark SQL Left Anti Join with Example
- Spark SQL Left Semi Join Example