In this article, I will explain how to do PySpark join on multiple columns of DataFrames by using join() and SQL, and I will also explain how to eliminate duplicate columns after join. Joining on multiple columns required to perform multiple conditions using & and | operators.
1. Quick Examples of DataFrames Join on Multiple Columns
Following are quick examples of joining multiple columns of PySpark DataFrame
# Quick examples of PySpark join multiple columns
# PySpark join multiple columns
empDF.join(deptDF, (empDF["dept_id"] == deptDF["dept_id"]) &
( empDF["branch_id"] == deptDF["branch_id"])).show()
# Using where or filter
empDF.join(deptDF).where((empDF["dept_id"] == deptDF["dept_id"]) &
(empDF["branch_id"] == deptDF["branch_id"])).show()
# Join without duplicate columns
empDF.join(deptDF,["dept_id","branch_id"]).show()
# Create tables
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
# Spark SQL
spark.sql("SELECT * FROM EMP e, DEPT d where e.dept_id == d.dept_id"
" and e.branch_id == d.branch_id").show()
Before we jump into how to use multiple columns on the join expression, first, let’s create PySpark DataFrames from emp
and dept
datasets, On these dept_id
and branch_id
columns are present on both datasets and we use these columns in the join expression while joining DataFrames.
Below is an Emp
DataFrame with columns “emp_id
“, “name
“, “branch_id
“, “dept_id
“, “gender
“, “salary
“.
# Import pyspark
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder \
.appName('SparkByExamples.com') \
.getOrCreate()
#EMP DataFrame
empData = [(1,"Smith","2018",10,"M",3000),
(2,"Rose","2010",20,"M",4000),
(3,"Williams","2010",10,"M",1000),
(4,"Jones","2005",10,"F",2000),
(5,"Brown","2010",30,"",-1),
(6,"Brown","2010",50,"",-1)
]
empColumns = ["emp_id","name","branch_id","dept_id",
"gender","salary"]
empDF = spark.createDataFrame(empData,empColumns)
empDF.show()
#DEPT DataFrame
deptData = [("Finance",10,"2018"),
("Marketing",20,"2010"),
("Marketing",20,"2018"),
("Sales",30,"2005"),
("Sales",30,"2010"),
("IT",50,"2010")
]
deptColumns = ["dept_name","dept_id","branch_id"]
deptDF=spark.createDataFrame(deptData,deptColumns)
deptDF.show()
Yields below output.
Below is Dept
DataFrame with columns “dept_name
“,”dept_id
“,”branch_id
“
#DEPT DataFrame
deptData = [("Finance",10,"2018"),
("Marketing",20,"2010"),
("Marketing",20,"2018"),
("Sales",30,"2005"),
("Sales",30,"2010"),
("IT",50,"2010")
]
deptColumns = ["dept_name","dept_id","branch_id"]
deptDF=spark.createDataFrame(deptData,deptColumns)
deptDF.show()
Yields below output.
2. PySpark Join Multiple Columns
The join syntax of PySpark join() takes, right
dataset as first argument, joinExprs
and joinType
as 2nd and 3rd arguments and we use joinExprs
to provide the join condition on multiple columns. Note that both joinExprs
and joinType
are optional arguments.
The below example joins emptDF
DataFrame with deptDF
DataFrame on multiple columns dept_id
and branch_id
using an inner join. As I said above, to join on multiple columns you have to use multiple conditions.
# PySpark join multiple columns
empDF.join(deptDF, (empDF["dept_id"] == deptDF["dept_id"]) &
( empDF["branch_id"] == deptDF["branch_id"]),"inner").show()
This example prints the below output to the console. You should use &
/ |
operators mare carefully and be careful about operator precedence (==
has lower precedence than bitwise AND
and OR
)
3. Using Where to provide Join Condition
Instead of using a join condition with join()
operator, we can use where()
to provide a join condition.
#Using Join with multiple columns on where clause
empDF.join(deptDF).where((empDF["dept_id"] == deptDF["dept_id"]) &
(empDF["branch_id"] == deptDF["branch_id"])).show()
4. Join without Duplicate Columns on Result
Ween you join, the resultant frame contains all columns from both DataFrames. since we have dept_id and branch_id on both we will end up with duplicate columns. To get a join result with out duplicate you have to use
# Join without duplicate columns
empDF.join(deptDF,["dept_id","branch_id"]).show()
Yields below output
+-------+---------+------+-----+------+------+---------+
|dept_id|branch_id|emp_id| name|gender|salary|dept_name|
+-------+---------+------+-----+------+------+---------+
| 10| 2018| 1|Smith| M| 3000| Finance|
| 20| 2010| 2| Rose| M| 4000|Marketing|
| 30| 2010| 5|Brown| | -1| Sales|
| 50| 2010| 6|Brown| | -1| IT|
+-------+---------+------+-----+------+------+---------+
4. SQL to Join Multiple Columns
Create a temporary table and use spark.sql() to execute the SQL query.
# Create tables
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
# Spark SQL
spark.sql("SELECT * FROM EMP e, DEPT d where e.dept_id == d.dept_id"
" and e.branch_id == d.branch_id").show()
5. Complete Example
Following is the complete example of joining two DataFrames on multiple columns.
# Import pyspark
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder \
.appName('SparkByExamples.com') \
.getOrCreate()
#EMP DataFrame
empData = [(1,"Smith","2018",10,"M",3000),
(2,"Rose","2010",20,"M",4000),
(3,"Williams","2010",10,"M",1000),
(4,"Jones","2005",10,"F",2000),
(5,"Brown","2010",30,"",-1),
(6,"Brown","2010",50,"",-1)
]
empColumns = ["emp_id","name","branch_id","dept_id",
"gender","salary"]
empDF = spark.createDataFrame(empData,empColumns)
empDF.show()
#DEPT DataFrame
deptData = [("Finance",10,"2018"),
("Marketing",20,"2010"),
("Marketing",20,"2018"),
("Sales",30,"2005"),
("Sales",30,"2010"),
("IT",50,"2010")
]
deptColumns = ["dept_name","dept_id","branch_id"]
deptDF=spark.createDataFrame(deptData,deptColumns)
deptDF.show()
# PySpark join multiple columns
empDF.join(deptDF, (empDF["dept_id"] == deptDF["dept_id"]) &
( empDF["branch_id"] == deptDF["branch_id"])).show()
# Using where or filter
empDF.join(deptDF).where((empDF["dept_id"] == deptDF["dept_id"]) &
(empDF["branch_id"] == deptDF["branch_id"])).show()
# Create tables
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
# Spark SQL
spark.sql("SELECT * FROM EMP e, DEPT d where e.dept_id == d.dept_id"
" and e.branch_id == d.branch_id").show()
The complete example is available at GitHub project for reference.
6. Conclusion
In this article, you have learned how to perform two DataFrame joins on multiple columns in PySpark, and also learned joining with multiple conditions using join(), where(), and SQL expression.
Related Articles
- PySpark Join Two or Multiple DataFrames
- PySpark Join Types | Join Two DataFrames
- PySpark SQL Self Join With Example
- PySpark SQL Left Semi Join Example
- Dynamic way of doing ETL through Pyspark
- PySpark isin() & SQL IN Operator
- PySpark alias() Column & DataFrame Examples
- PySpark SQL Self Join With Example