Spark SQL Join on multiple columns

  • Post author:
  • Post category:Apache Spark

In this article, you will learn how to use Spark SQL Join condition on multiple columns of DataFrame and Dataset with Scala example. Also, you will learn different ways to provide Join condition on two or more columns.

Before we jump into how to use multiple columns on Join expression, first, let’s create a DataFrames from  emp and dept datasets, On these dept_id and branch_id columns are present on both datasets and we use these columns in Join expression while joining DataFrames.


  val emp = Seq((1,"Smith",-1,"2018",10,"M",3000),
    (2,"Rose",1,"2010",20,"M",4000),
    (3,"Williams",1,"2010",10,"M",1000),
    (4,"Jones",2,"2005",10,"F",2000),
    (5,"Brown",2,"2010",30,"",-1),
    (6,"Brown",2,"2010",50,"",-1)
  )
  val empColumns = Seq("emp_id","name","superior_emp_id","branch_id","dept_id",
  "gender","salary")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

  val dept = Seq(("Finance",10,"2018"),
    ("Marketing",20,"2010"),
    ("Marketing",20,"2018"),
    ("Sales",30,"2005"),
    ("Sales",30,"2010"),
    ("IT",50,"2010")
  )

  val deptColumns = Seq("dept_name","dept_id","branch_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)

This example prints the below output.


+------+--------+---------------+---------+-------+------+------+
|emp_id|name    |superior_emp_id|branch_id|dept_id|gender|salary|
+------+--------+---------------+---------+-------+------+------+
|1     |Smith   |-1             |2018     |10     |M     |3000  |
|2     |Rose    |1              |2010     |20     |M     |4000  |
|3     |Williams|1              |2010     |10     |M     |1000  |
|4     |Jones   |2              |2005     |10     |F     |2000  |
|5     |Brown   |2              |2010     |30     |      |-1    |
|6     |Brown   |2              |2010     |50     |      |-1    |
+------+--------+---------------+---------+-------+------+------+

+---------+-------+---------+
|dept_name|dept_id|branch_id|
+---------+-------+---------+
|Finance  |10     |2018     |
|Marketing|20     |2010     |
|Marketing|20     |2018     |
|Sales    |30     |2005     |
|Sales    |30     |2010     |
|IT       |50     |2010     |
+---------+-------+---------+

Using Join syntax


join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

This join syntax takes, takes right dataset, joinExprs and joinType as arguments and we use joinExprs to provide join condition on multiple columns.


  //Using multiple columns on join expression
  empDF.join(deptDF, empDF("dept_id") === deptDF("dept_id") &&
    empDF("branch_id") === deptDF("branch_id"),"inner")
      .show(false)

This example joins emptDF DataFrame with deptDF DataFrame on multiple columns dept_id and branch_id columns using an inner join. This example prints below output to console.


+------+-----+---------------+---------+-------+------+------+---------+-------+---------+
|emp_id|name |superior_emp_id|branch_id|dept_id|gender|salary|dept_name|dept_id|branch_id|
+------+-----+---------------+---------+-------+------+------+---------+-------+---------+
|1     |Smith|-1             |2018     |10     |M     |3000  |Finance  |10     |2018     |
|2     |Rose |1              |2010     |20     |M     |4000  |Marketing|20     |2010     |
|5     |Brown|2              |2010     |30     |      |-1    |Sales    |30     |2010     |
|6     |Brown|2              |2010     |50     |      |-1    |IT       |50     |2010     |
+------+-----+---------------+---------+-------+------+------+---------+-------+---------+

The rest of the article, provides a similar example using where(), filter() and spark.sql() and all examples provides the same output as above.

Using Where to provide Join condition

Instead of using a join condition with join() operator, we can use where() to provide a join condition.


  //Using Join with multiple columns on where clause 
  empDF.join(deptDF).where(empDF("dept_id") === deptDF("dept_id") &&
    empDF("branch_id") === deptDF("branch_id"))
    .show(false)

Using Filter to provide Join condition

We can also use filter() to provide Spark Join condition, below example we have provided join with multiple columns.


  //Using Join with multiple columns on filter clause
  empDF.join(deptDF).filter(empDF("dept_id") === deptDF("dept_id") &&
    empDF("branch_id") === deptDF("branch_id"))
    .show(false)

Using Spark SQL Expression to provide Join condition

Here, we will use the native SQL syntax in Spark to join tables with a condition on multiple columns


  //Using SQL & multiple columns on join expression
  empDF.createOrReplaceTempView("EMP")
  deptDF.createOrReplaceTempView("DEPT")

  val resultDF = spark.sql("select e.* from EMP e, DEPT d " +
    "where e.dept_id == d.dept_id and e.branch_id == d.branch_id")
  resultDF.show(false)

Source code of using Spark SQL on Multiple columns


package com.sparkbyexamples.spark.dataframe.join

import org.apache.spark.sql.SparkSession

object JoinMultipleColumns extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val emp = Seq((1,"Smith",-1,"2018",10,"M",3000),
    (2,"Rose",1,"2010",20,"M",4000),
    (3,"Williams",1,"2010",10,"M",1000),
    (4,"Jones",2,"2005",10,"F",2000),
    (5,"Brown",2,"2010",30,"",-1),
    (6,"Brown",2,"2010",50,"",-1)
  )
  val empColumns = Seq("emp_id","name","superior_emp_id","branch_id","dept_id","gender","salary")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

  val dept = Seq(("Finance",10,"2018"),
    ("Marketing",20,"2010"),
    ("Marketing",20,"2018"),
    ("Sales",30,"2005"),
    ("Sales",30,"2010"),
    ("IT",50,"2010")
  )

  val deptColumns = Seq("dept_name","dept_id","branch_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)

  //Using multiple columns on join expression
  empDF.join(deptDF, empDF("dept_id") === deptDF("dept_id") &&
    empDF("branch_id") === deptDF("branch_id"),"inner")
      .show(false)

  //Using Join with multiple columns on where clause
  empDF.join(deptDF).where(empDF("dept_id") === deptDF("dept_id") &&
    empDF("branch_id") === deptDF("branch_id"))
    .show(false)

  //Using Join with multiple columns on filter clause
  empDF.join(deptDF).filter(empDF("dept_id") === deptDF("dept_id") &&
    empDF("branch_id") === deptDF("branch_id"))
    .show(false)

  //Using SQL & multiple columns on join expression
  empDF.createOrReplaceTempView("EMP")
  deptDF.createOrReplaceTempView("DEPT")

  val resultDF = spark.sql("select e.* from EMP e, DEPT d " +
    "where e.dept_id == d.dept_id and e.branch_id == d.branch_id")
  resultDF.show(false)
}

The complete example is available at GitHub project for reference.

Conclusion

In this article, you have learned how to use Spark SQL Join on multiple DataFrame columns with Scala example and also learned how to use join conditions using Join, where, filter and SQL expression.

Thanks for reading. If you like it, please do share the article by following the below social links and any comments or suggestions are welcome in the comments sections! 

Happy Learning !!

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

Leave a Reply