You are currently viewing Spark SQL Inner Join Explained

Similar to SQL, Spark also supports Inner join to join two DataFrame tables, In this article, you will learn how to use an Inner Join on DataFrame with Scala example. Also, you will learn different ways to provide Join condition.

Advertisements

Inner join is the default join in Spark and it’s mostly used, this joins two datasets on key columns and where keys don’t match the rows get dropped from both datasets.

Before we jump into Spark SQL Join examples, first, let’s create an "emp" and "dept" DataFrame’s. here, column "emp_id" is unique on emp and "dept_id" is unique on the dept dataset’s and emp_dept_id from emp has a reference to dept_id on dept dataset.


   val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
    (2,"Rose",1,"2010","20","M",4000),
    (3,"Williams",1,"2010","10","M",1000),
    (4,"Jones",2,"2005","10","F",2000),
    (5,"Brown",2,"2010","40","",-1),
    (6,"Brown",2,"2010","50","",-1)
  )
  val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","emp_dept_id",
     "gender","salary")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )

  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)

This example prints the below output.


+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+

Using Join operator


join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
join(right: Dataset[_], joinExprs: Column): DataFrame

The first join syntax takes, takes right dataset, joinExprs and joinType as arguments and we use joinExprs to provide a join condition. second join syntax takes just dataset and joinExprs and it considers default join as inner. Below are examples of both syntaxes and provides the same output.


  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"inner")
    .show(false)
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"))
    .show(false)

When we apply Inner join on our datasets, It drops “emp_dept_id” 60 from “emp” and “dept_id” 30 from “dept” datasets. Below is the result of the above Join expressions.


+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

Alternatively, you can also use Inner.sql as jointype and to use this you should import import org.apache.spark.sql.catalyst.plans.Inner


  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),Inner.sql)
    .show(false)

The rest of the article provides a spark SQL Inner Join example using DataFrame where(), filter() operators and spark.sql(), all these examples provides the same output as above.

Using Where to provide Join condition

Instead of using a join condition with join() operator, here, we use where() to provide an inner join condition.


   empDF.join(deptDF).where(empDF("emp_dept_id") ===  deptDF("dept_id"))
    .show(false)

Using Filter to provide Join condition

We can also use filter() to provide join condition for Spark Join operations


    empDF.join(deptDF).filter(empDF("emp_dept_id") ===  deptDF("dept_id"))
    .show(false)

Using Spark SQL Expression for Inner Join

Here, we will use the native SQL syntax in Spark to do Inner join. In order to use Native SQL syntax, first, we should create a temporary view and then use spark.sql() to execute the SQL expression.


  empDF.createOrReplaceTempView("EMP")
  deptDF.createOrReplaceTempView("DEPT")

  val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
  joinDF2.show(false)

Source code of using Spark DataFrame Inner Join


package com.sparkbyexamples.spark.dataframe.join

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.Inner

object InnerJoinExample extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
    (2,"Rose",1,"2010","20","M",4000),
    (3,"Williams",1,"2010","10","M",1000),
    (4,"Jones",2,"2005","10","F",2000),
    (5,"Brown",2,"2010","40","",-1),
    (6,"Brown",2,"2010","50","",-1)
  )
  val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )

  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)


  println("Inner join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"inner")
    .show(false)
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"))
    .show(false)

  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),Inner.sql)
    .show(false)

  empDF.join(deptDF).where(empDF("emp_dept_id") ===  deptDF("dept_id"))
    .show(false)

  empDF.join(deptDF).filter(empDF("emp_dept_id") ===  deptDF("dept_id"))
    .show(false)

  empDF.createOrReplaceTempView("EMP")
  deptDF.createOrReplaceTempView("DEPT")

  val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
  joinDF2.show(false)
}

The complete example is available at GitHub project for reference.

Conclusion

In this article, you have learned how to use Spark SQL Inner Join using DataFrame with Scala example and also learned how to use conditions using Join, where, filter and SQL expression.

Thanks for reading. If you like it, please do share the article by following the below social links and any comments or suggestions are welcome in the comments sections! 

Happy Learning !!