Though there is no self-join type available in PySpark SQL, we can use any join type to join DataFrame to itself. below example use inner
self join.
In this PySpark article, I will explain how to do Self Join (Self Join) on two DataFrames with PySpark Example.
Before we jump into PySpark Self Join examples, first, let’s create an emp
and dept
DataFrames. here, column emp_id
is unique on emp and dept_id
is unique on the dept datasets and emp_dept_id
from emp has a reference to dept_id
on the dept dataset.
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkbyexamples.com").getOrCreate()
emp = [(1,"Smith",-1,"2018","10","M",3000), \
(2,"Rose",1,"2010","20","M",4000), \
(3,"Williams",1,"2010","10","M",1000), \
(4,"Jones",2,"2005","10","F",2000), \
(5,"Brown",2,"2010","40","",-1), \
(6,"Brown",2,"2010","50","",-1) \
]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
"emp_dept_id","gender","salary"]
empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)
dept = [("Finance",10), \
("Marketing",20), \
("Sales",30), \
("IT",40) \
]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)
This prints emp and dept DataFrame to the console.
#Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1 |Smith |-1 |2018 |10 |M |3000 |
|2 |Rose |1 |2010 |20 |M |4000 |
|3 |Williams|1 |2010 |10 |M |1000 |
|4 |Jones |2 |2005 |10 |F |2000 |
|5 |Brown |2 |2010 |40 | |-1 |
|6 |Brown |2 |2010 |50 | |-1 |
+------+--------+---------------+-----------+-----------+------+------+
#Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance |10 |
|Marketing|20 |
|Sales |30 |
|IT |40 |
+---------+-------+
PySpark DataFrame Self Join Example
Below is an example of how to use Self Join (Self
) on PySpark DataFrame.
# Self Join DataFrames
from pyspark.sql.functions import col
empDF.alias("emp1").join(empDF.alias("emp2"), \
col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
.select(col("emp1.emp_id"),col("emp1.name"), \
col("emp2.emp_id").alias("superior_emp_id"), \
col("emp2.name").alias("superior_emp_name")) \
.show(truncate=False)
Here, we are joining emp
dataset with itself to find out superior emp_id
and name
for all employees.
+------+--------+---------------+-----------------+
|emp_id|name |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2 |Rose |1 |Smith |
|3 |Williams|1 |Smith |
|4 |Jones |2 |Rose |
|5 |Brown |2 |Rose |
|6 |Brown |2 |Rose |
+------+--------+---------------+-----------------+
Using PySpark SQL Self Join
Let’s see how to use Self Join on PySpark SQL expression, In order to do so first let’s create a temporary view for EMP and DEPT tables.
# Self Join using SQL
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
joinDF2 = spark.sql("SELECT e.* FROM EMP e LEFT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
.show(truncate=False)
This also returns the same output as above.
Conclusion
In this PySpark article, you have learned how to use self-join with available join types with Python examples.
Hope you Like it !!
Related Articles
- PySpark SQL Inner Join Dataframe Example
- PySpark SQL Left Outer Join Examples
- PySpark SQL Left Anti Join Examples
- PySpark SQL Full Outer Join with Example
- PySpark SQL Right Outer Join with Example
- PySpark SQL Left Semi Join With Examples
- PySpark SQL expr() (Expression ) Function
- PySpark SQL – Working with Unix Time | Timestamp