You are currently viewing PySpark SQL Self Join With Example

Self-joins in PySpark SQL offer a powerful mechanism for comparing and correlating data within the same dataset. This advanced technique involves joining a DataFrame with itself, allowing for insightful analyses such as hierarchical relationships or comparisons between related entities within a single table.

Advertisements

Performing a self join in PySpark involves joining a DataFrame with itself based on a related condition. First, assign aliases to the DataFrame instances to distinguish between the two copies of the same DataFrame. Second, specify the condition to relate rows from the first instance of the DataFrame to rows from the second instance, and finally, specify the join operation.

To perform self join, let’s create an emp DataFrame with necessary columns.


import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkbyexamples.com").getOrCreate()
  
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

This prints emp and dept DataFrame to the console.


# Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

# Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+

1. PySpark DataFrame Self Join

A self join is a specific type of join operation in PySpark SQL where a table is joined with itself. In other words, a self join is performed when you want to combine rows from the same DataFrame based on a related condition.

Here’s how a self join works:

  1. Table Alias: Since you are joining a DataFrame with itself, you need to use DataFrame aliases to distinguish between the two instances of the same DataFrame.
  2. Join Condition: You specify a join condition that relates rows from the first instance of the DataFrame (referred to by one alias) to rows from the second instance of the DataFrame (referred to by another alias).

# Self Join DataFrames
from pyspark.sql.functions import col

empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)

This SQL query performs a self join on the empDF DataFrame, aliasing it as emp1 and emp2. It selects columns emp_id and name from the first instance (emp1) and renames columns emp_id and name from the second instance (emp2) as superior_emp_id and superior_emp_name, respectively. The join condition relates emp1.superior_emp_id to emp2.emp_id, allowing for the correlation of employees with their respective supervisors.


# Output:
+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+

2. Using PySpark SQL Self Join

A self-join is a specific type of join operation in SQL (Structured Query Language) where a table is joined with itself. PySpark SQL also provides a way to run ANSI SQL queries after creating a temporary view from the DataFrame.


# Self Join using SQL
empDF.createOrReplaceTempView("EMP")

joinDF2 = spark.sql("SELECT e1.emp_id, e1.name, e2.name as superior_name FROM EMP e1 INNER JOIN EMP e2 ON e1.emp_id = e2.superior_emp_id ") \
  .show(truncate=False)

This also returns the same output as above.

3. Conclusion

PySpark self-joins can be useful in various scenarios, such as organizational hierarchies, network relationships, or comparing data across different time periods within the same dataset.

Hope you Like it !!

References

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply