You are currently viewing Spark Join Multiple DataFrames | Tables

Spark supports joining multiple (two or more) DataFrames, In this article, you will learn how to use a Join on multiple DataFrames using Spark SQL expression(on tables) and Join operator with Scala example. Also, you will learn different ways to provide Join conditions.

In order to explain joining with multiple (two or more) DataFrames in Spark, we will use Inner join, this is the default join in Spark and it’s mostly used. Inner join merges two DataFrames/Datasets on key columns, and where keys don’t match the rows get dropped from both datasets.

Related: How to avoid duplicate columns on DataFrame after Join

Before we jump into Spark Join examples, first, let’s create an "emp" , "dept", "address" DataFrame tables.

Emp Table


    val emp = Seq((1,"Smith","10"),
    (2,"Rose","20"),
    (3,"Williams","10"),
    (4,"Jones","10"),
    (5,"Brown","40"),
    (6,"Brown","50")
  )
  val empColumns = Seq("emp_id","name","emp_dept_id")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

Yields below output.


+------+--------+-----------+
|emp_id|name    |emp_dept_id|
+------+--------+-----------+
|1     |Smith   |10         |
|2     |Rose    |20         |
|3     |Williams|10         |
|4     |Jones   |10         |
|5     |Brown   |40         |
|6     |Brown   |50         |
+------+--------+-----------+

Dept Table


  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )
  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)

Yields below output.


+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+

Address Table


  val address = Seq((1,"1523 Main St","SFO","CA"),
    (2,"3453 Orange St","SFO","NY"),
    (3,"34 Warner St","Jersey","NJ"),
    (4,"221 Cavalier St","Newark","DE"),
    (5,"789 Walnut St","Sandiago","CA")
  )
  val addColumns = Seq("emp_id","addline1","city","state")
  val addDF = address.toDF(addColumns:_*)
  addDF.show(false)

Yields below output.


+------+---------------+--------+-----+
|emp_id|addline1       |city    |state|
+------+---------------+--------+-----+
|1     |1523 Main St   |SFO     |CA   |
|2     |3453 Orange St |SFO     |NY   |
|3     |34 Warner St   |Jersey  |NJ   |
|4     |221 Cavalier St|Newark  |DE   |
|5     |789 Walnut St  |Sandiago|CA   |
+------+---------------+--------+-----+

Using Join operator


join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
join(right: Dataset[_]): DataFrame

The first join syntax takes, takes right dataset, joinExprs and joinType as arguments and we use joinExprs to provide a join condition. second join syntax takes just dataset and joinExprs and it considers default join as inner join. The rest of the article uses both syntaxes to join multiple Spark DataFrames.


  //Using Join expression
  empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner" )
      .join(addDF,empDF("emp_id") === addDF("emp_id"),"inner")
      .show(false)

This joins all 3 tables and returns a new DataFrame with the below result.


+------+--------+-----------+---------+-------+------+---------------+--------+-----+
|emp_id|name    |emp_dept_id|dept_name|dept_id|emp_id|addline1       |city    |state|
+------+--------+-----------+---------+-------+------+---------------+--------+-----+
|1     |Smith   |10         |Finance  |10     |1     |1523 Main St   |SFO     |CA   |
|2     |Rose    |20         |Marketing|20     |2     |3453 Orange St |SFO     |NY   |
|3     |Williams|10         |Finance  |10     |3     |34 Warner St   |Jersey  |NJ   |
|4     |Jones   |10         |Finance  |10     |4     |221 Cavalier St|Newark  |DE   |
|5     |Brown   |40         |IT       |40     |5     |789 Walnut St  |Sandiago|CA   |
+------+--------+-----------+---------+-------+------+---------------+--------+-----+

Alternatively, you can also use Inner.sql as jointype and to use this you should import import org.apache.spark.sql.catalyst.plans.Inner


  //Using Join expression
  empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),Inner.sql )
      .join(addDF,empDF("emp_id") === addDF("emp_id"),Inner.sql)
      .show(false)

The rest of the article provides a spark Inner Join example using DataFrame where(), filter() operators and spark.sql(), all these examples provide the same output as above.

Using Where to provide Join condition

Instead of using a join condition with join() operator, here, we use where() to provide an inner join condition.


  //Using where
  empDF.join(deptDF).where(empDF("emp_dept_id") === deptDF("dept_id"))
    .join(addDF).where(empDF("emp_id") === addDF("emp_id"))
    .show(false)

Using Filter to provide Join condition

We can also use filter() to provide join condition for Spark Join operations


  //Using Filter
  empDF.join(deptDF).filter(empDF("emp_dept_id") === deptDF("dept_id"))
    .join(addDF).filter(empDF("emp_id") === addDF("emp_id"))
    .show(false)

Using SQL Expression

Here, we will use the native SQL syntax to join multiple tables, in order to use Native SQL syntax, first, we should create a temporary view for all our DataFrames and then use spark.sql() to execute the SQL expression.


  //Using SQL expression
  empDF.createOrReplaceTempView("EMP")
  deptDF.createOrReplaceTempView("DEPT")
  addDF.createOrReplaceTempView("ADD")

  spark.sql("select * from EMP e, DEPT d, ADD a " +
    "where e.emp_dept_id == d.dept_id and e.emp_id == a.emp_id")
    .show(false)

Source code of Spark Join on multiple DataFrames


package com.sparkbyexamples.spark.dataframe.join

import org.apache.spark.sql.SparkSession

object JoinMultipleDataFrames extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val emp = Seq((1,"Smith","10"),
    (2,"Rose","20"),
    (3,"Williams","10"),
    (4,"Jones","10"),
    (5,"Brown","40"),
    (6,"Brown","50")
  )
  val empColumns = Seq("emp_id","name","emp_dept_id")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )
  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)

  val address = Seq((1,"1523 Main St","SFO","CA"),
    (2,"3453 Orange St","SFO","NY"),
    (3,"34 Warner St","Jersey","NJ"),
    (4,"221 Cavalier St","Newark","DE"),
    (5,"789 Walnut St","Sandiago","CA")
  )
  val addColumns = Seq("emp_id","addline1","city","state")
  val addDF = address.toDF(addColumns:_*)
  addDF.show(false)

  //Using Join expression
  empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner" )
      .join(addDF,empDF("emp_id") === addDF("emp_id"),"inner")
      .show(false)

  //Using where
  empDF.join(deptDF).where(empDF("emp_dept_id") === deptDF("dept_id"))
    .join(addDF).where(empDF("emp_id") === addDF("emp_id"))
    .show(false)

  //Using Filter
  empDF.join(deptDF).filter(empDF("emp_dept_id") === deptDF("dept_id"))
    .join(addDF).filter(empDF("emp_id") === addDF("emp_id"))
    .show(false)

  //Using SQL expression
  empDF.createOrReplaceTempView("EMP")
  deptDF.createOrReplaceTempView("DEPT")
  addDF.createOrReplaceTempView("ADD")

  spark.sql("select * from EMP e, DEPT d, ADD a " +
    "where e.emp_dept_id == d.dept_id and e.emp_id == a.emp_id")
    .show(false)
}

The complete example is available at GitHub project for reference.

Conclusion

In this Spark article, you have learned how to join multiple (two or more) DataFrames and tables(creating temporary views) with Scala example and also learned how to use conditions using where filter.

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

This Post Has 2 Comments

  1. NNK

    Thank you for your wonderful words. I would need community help to create a project here.

  2. suneel

    Thank you for your Support to the community..
    I really have to appreciate you and your team efforts…..

    I would like to propose as idea, to make our community to provide next level of experience with spark and also to encourage aspirants .

    Can we implement Spark Scala/ PySpark project here

Comments are closed.