You are currently viewing Spark SQL Self Join Explained

Similar to SQL, Spark also provides to Self join to join a DataFrame or table to itself, In this article, you will learn how to use a Self Join on multiple DataFrame tables with Scala example. Also, you will learn different ways to provide Join condition.

Before we jump into how to use self join expression, first, let’s create a DataFrame from  emp dataset, On this dataset we have an employee id column emp_id and superior/manager-employee id superior_emp_id, we use these two columns to do a self-join and find’s out a superior name for all employee’s


  val emp = Seq((1,"Smith",1,"10",3000),
    (2,"Rose",1,"20",4000),
    (3,"Williams",1,"10",1000),
    (4,"Jones",2,"10",2000),
    (5,"Brown",2,"40",-1),
    (6,"Brown",2,"50",-1)
  )
  val empColumns = Seq("emp_id","name","superior_emp_id","emp_dept_id","salary")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

This example prints the below output.


+------+--------+---------------+-----------+------+
|emp_id|name    |superior_emp_id|emp_dept_id|salary|
+------+--------+---------------+-----------+------+
|1     |Smith   |1              |10         |3000  |
|2     |Rose    |1              |20         |4000  |
|3     |Williams|1              |10         |1000  |
|4     |Jones   |2              |10         |2000  |
|5     |Brown   |2              |40         |-1    |
|6     |Brown   |2              |50         |-1    |
+------+--------+---------------+-----------+------+

Using Join syntax


join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

This join syntax takes, takes right dataset, joinExprs and joinType as arguments and we use joinExprs to provide join condition on multiple columns.


  val selfDF = empDF.as("emp1").join(empDF.as("emp2"),
    col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
  selfDF.show(false)

This example joins DataFrame emptDF alias emp1 and to itself alias as emp2 by joining emp1.superior_emp_id with emp2.emp_id


+------+--------+---------------+-----------+------+------+-----+---------------+-----------+------+
|emp_id|name    |superior_emp_id|emp_dept_id|salary|emp_id|name |superior_emp_id|emp_dept_id|salary|
+------+--------+---------------+-----------+------+------+-----+---------------+-----------+------+
|1     |Smith   |1              |10         |3000  |1     |Smith|1              |10         |3000  |
|2     |Rose    |1              |20         |4000  |1     |Smith|1              |10         |3000  |
|3     |Williams|1              |10         |1000  |1     |Smith|1              |10         |3000  |
|4     |Jones   |2              |10         |2000  |2     |Rose |1              |20         |4000  |
|5     |Brown   |2              |40         |-1    |2     |Rose |1              |20         |4000  |
|6     |Brown   |2              |50         |-1    |2     |Rose |1              |20         |4000  |
+------+--------+---------------+-----------+------+------+-----+---------------+-----------+------+

Above table column names are redundant, hence, let’s select the columns that are needed and rename the column name that makes sense using an alias.


  selfDF.select(col("emp1.emp_id"),col("emp1.name"),
      col("emp2.emp_id").as("superior_emp_id"),
      col("emp2.name").as("superior_emp_name"))
    .show(false)

This yields the below output.


+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|1     |Smith   |1              |Smith            |
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+

The rest of the article, provides a similar example using where(), filter() and spark.sql() and all examples provides the same output as above.

Using Where to provide Join condition

Instead of using a join condition with join() operator, here, we use where() to provide a self join condition.


  //Spark SQL self join with where clause
  empDF.as("emp1").join(empDF.as("emp2")).where(
    col("emp1.superior_emp_id") === col("emp2.emp_id"))
  .select(col("emp1.emp_id"),col("emp1.name"),
    col("emp2.emp_id").as("superior_emp_id"),
    col("emp2.name").as("superior_emp_name"))
    .show(false)

Using Filter to provide Join condition

We can also use filter() to provide join condition for Spark Join operations


  //Spark SQL self join with filter clause
  empDF.as("emp1").join(empDF.as("emp2")).filter(
    col("emp1.superior_emp_id") === col("emp2.emp_id"))
    .select(col("emp1.emp_id"),col("emp1.name"),
      col("emp2.emp_id").as("superior_emp_id"),
      col("emp2.name").as("superior_emp_name"))
    .show(false)

Using Spark SQL Expression for Self Join

Here, we will use the native SQL syntax in Spark to do self join. In order to use Native SQL syntax, first, we should create a temporary view and then use spark.sql() to execute the SQL expression. On below example to do a self join we use INNER JOIN type


  empDF.createOrReplaceTempView("EMP")
  spark.sql("select emp1.emp_id,emp1.name," +
    "emp2.emp_id as superior_emp_id, emp2.name as superior_emp_name " +
    "from EMP emp1 INNER JOIN EMP emp2 on emp1.superior_emp_id == emp2.emp_id")
    .show(false)

Source code of using Spark DataFrame Self Join


package com.sparkbyexamples.spark.dataframe.join

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

object SelfJoinExample extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val emp = Seq((1,"Smith",1,"10",3000),
    (2,"Rose",1,"20",4000),
    (3,"Williams",1,"10",1000),
    (4,"Jones",2,"10",2000),
    (5,"Brown",2,"40",-1),
    (6,"Brown",2,"50",-1)
  )
  val empColumns = Seq("emp_id","name","superior_emp_id","emp_dept_id","salary")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

  println("self join")
  val selfDF = empDF.as("emp1").join(empDF.as("emp2"),
    col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
  selfDF.show(false)

  selfDF.select(col("emp1.emp_id"),col("emp1.name"),
      col("emp2.emp_id").as("superior_emp_id"),
      col("emp2.name").as("superior_emp_name"))
    .show(false)

  //Spark SQL self join with where clause
  empDF.as("emp1").join(empDF.as("emp2")).where(
    col("emp1.superior_emp_id") === col("emp2.emp_id"))
  .select(col("emp1.emp_id"),col("emp1.name"),
    col("emp2.emp_id").as("superior_emp_id"),
    col("emp2.name").as("superior_emp_name"))
    .show(false)

  //Spark SQL self join with filter clause
  empDF.as("emp1").join(empDF.as("emp2")).filter(
    col("emp1.superior_emp_id") === col("emp2.emp_id"))
    .select(col("emp1.emp_id"),col("emp1.name"),
      col("emp2.emp_id").as("superior_emp_id"),
      col("emp2.name").as("superior_emp_name"))
    .show(false)

  empDF.createOrReplaceTempView("EMP")
  spark.sql("select emp1.emp_id,emp1.name," +
    "emp2.emp_id as superior_emp_id, emp2.name as superior_emp_name " +
    "from EMP emp1 INNER JOIN EMP emp2 on emp1.superior_emp_id == emp2.emp_id")
    .show(false)

}

The complete example is available at GitHub project for reference.

Conclusion

In this article, you have learned how to use Spark SQL Self Join Using DataFrame with Scala example and also learned how to use conditions using Join, where, filter and SQL expression.

Thanks for reading. If you like it, please do share the article by following the below social links and any comments or suggestions are welcome in the comments sections! 

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium