• Post author:
  • Post category:PySpark
  • Post last modified:March 27, 2024
  • Reading time:11 mins read
You are currently viewing PySpark Join Multiple Columns

In this article, I will explain how to do PySpark join on multiple columns of DataFrames by using join() and SQL, and I will also explain how to eliminate duplicate columns after join. Joining on multiple columns required to perform multiple conditions using & and | operators.

1. Quick Examples of DataFrames Join on Multiple Columns

Following are quick examples of joining multiple columns of PySpark DataFrame


# Quick examples of PySpark join multiple columns

# PySpark join multiple columns
empDF.join(deptDF, (empDF["dept_id"] == deptDF["dept_id"]) &
   ( empDF["branch_id"] == deptDF["branch_id"])).show()

# Using where or filter
empDF.join(deptDF).where((empDF["dept_id"] == deptDF["dept_id"]) &
    (empDF["branch_id"] == deptDF["branch_id"])).show()

# Join without duplicate columns
empDF.join(deptDF,["dept_id","branch_id"]).show()
    
# Create tables
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

# Spark SQL
spark.sql("SELECT * FROM EMP e, DEPT d where e.dept_id == d.dept_id"
         " and e.branch_id == d.branch_id").show()

Before we jump into how to use multiple columns on the join expression, first, let’s create PySpark DataFrames from emp and dept datasets, On these dept_id and branch_id columns are present on both datasets and we use these columns in the join expression while joining DataFrames.

Below is an Emp DataFrame with columns “emp_id“, “name“, “branch_id“, “dept_id“, “gender“, “salary“.


# Import pyspark 
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
          .appName('SparkByExamples.com') \
          .getOrCreate()
         
#EMP DataFrame

empData = [(1,"Smith","2018",10,"M",3000),
    (2,"Rose","2010",20,"M",4000),
    (3,"Williams","2010",10,"M",1000),
    (4,"Jones","2005",10,"F",2000),
    (5,"Brown","2010",30,"",-1),
    (6,"Brown","2010",50,"",-1)
  ]
  
empColumns = ["emp_id","name","branch_id","dept_id",
  "gender","salary"]
empDF = spark.createDataFrame(empData,empColumns)
empDF.show()

#DEPT DataFrame
deptData = [("Finance",10,"2018"),
    ("Marketing",20,"2010"),
    ("Marketing",20,"2018"),
    ("Sales",30,"2005"),
    ("Sales",30,"2010"),
    ("IT",50,"2010")
  ]
deptColumns = ["dept_name","dept_id","branch_id"]
deptDF=spark.createDataFrame(deptData,deptColumns)  
deptDF.show()

Yields below output.

pyspark join multiple columns

Below is Dept DataFrame with columns “dept_name“,”dept_id“,”branch_id


#DEPT DataFrame
deptData = [("Finance",10,"2018"),
    ("Marketing",20,"2010"),
    ("Marketing",20,"2018"),
    ("Sales",30,"2005"),
    ("Sales",30,"2010"),
    ("IT",50,"2010")
  ]
deptColumns = ["dept_name","dept_id","branch_id"]
deptDF=spark.createDataFrame(deptData,deptColumns)  
deptDF.show()

Yields below output.

join multiple columns conditions

2. PySpark Join Multiple Columns

The join syntax of PySpark join() takes, right dataset as first argument, joinExprs and joinType as 2nd and 3rd arguments and we use joinExprs to provide the join condition on multiple columns. Note that both joinExprs and joinType are optional arguments.

The below example joins emptDF DataFrame with deptDF DataFrame on multiple columns dept_id and branch_id using an inner join. As I said above, to join on multiple columns you have to use multiple conditions.


# PySpark join multiple columns
empDF.join(deptDF, (empDF["dept_id"] == deptDF["dept_id"]) &
   ( empDF["branch_id"] == deptDF["branch_id"]),"inner").show()

This example prints the below output to the console. You should use & / | operators mare carefully and be careful about operator precedence (== has lower precedence than bitwise AND and OR)

3. Using Where to provide Join Condition

Instead of using a join condition with join() operator, we can use where() to provide a join condition.


#Using Join with multiple columns on where clause 
empDF.join(deptDF).where((empDF["dept_id"] == deptDF["dept_id"]) &
    (empDF["branch_id"] == deptDF["branch_id"])).show()

4. Join without Duplicate Columns on Result

Ween you join, the resultant frame contains all columns from both DataFrames. since we have dept_id and branch_id on both we will end up with duplicate columns. To get a join result with out duplicate you have to use


# Join without duplicate columns
empDF.join(deptDF,["dept_id","branch_id"]).show()

Yields below output


+-------+---------+------+-----+------+------+---------+
|dept_id|branch_id|emp_id| name|gender|salary|dept_name|
+-------+---------+------+-----+------+------+---------+
|     10|     2018|     1|Smith|     M|  3000|  Finance|
|     20|     2010|     2| Rose|     M|  4000|Marketing|
|     30|     2010|     5|Brown|      |    -1|    Sales|
|     50|     2010|     6|Brown|      |    -1|       IT|
+-------+---------+------+-----+------+------+---------+

4. SQL to Join Multiple Columns

Finally, let’s convert the above code into the PySpark SQL query to join on multiple columns. In order to do so, first, you need to create a temporary view by using createOrReplaceTempView() and use SparkSession.sql() to run the query. The table would be available to use until you end your SparkSession.


# Create tables
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

# Spark SQL
spark.sql("SELECT * FROM EMP e, DEPT d where e.dept_id == d.dept_id"
         " and e.branch_id == d.branch_id").show()

5. Complete Example

Following is the complete example of joining two DataFrames on multiple columns.


# Import pyspark 
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
          .appName('SparkByExamples.com') \
          .getOrCreate()
         
#EMP DataFrame
empData = [(1,"Smith","2018",10,"M",3000),
    (2,"Rose","2010",20,"M",4000),
    (3,"Williams","2010",10,"M",1000),
    (4,"Jones","2005",10,"F",2000),
    (5,"Brown","2010",30,"",-1),
    (6,"Brown","2010",50,"",-1)
  ]
  
empColumns = ["emp_id","name","branch_id","dept_id",
  "gender","salary"]
empDF = spark.createDataFrame(empData,empColumns)
empDF.show()

#DEPT DataFrame
deptData = [("Finance",10,"2018"),
    ("Marketing",20,"2010"),
    ("Marketing",20,"2018"),
    ("Sales",30,"2005"),
    ("Sales",30,"2010"),
    ("IT",50,"2010")
  ]
deptColumns = ["dept_name","dept_id","branch_id"]
deptDF=spark.createDataFrame(deptData,deptColumns)  
deptDF.show()

# PySpark join multiple columns
empDF.join(deptDF, (empDF["dept_id"] == deptDF["dept_id"]) &
   ( empDF["branch_id"] == deptDF["branch_id"])).show()

# Using where or filter
empDF.join(deptDF).where((empDF["dept_id"] == deptDF["dept_id"]) &
    (empDF["branch_id"] == deptDF["branch_id"])).show()
    
# Create tables
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

# Spark SQL
spark.sql("SELECT * FROM EMP e, DEPT d where e.dept_id == d.dept_id"
         " and e.branch_id == d.branch_id").show()

The complete example is available at GitHub project for reference.

6. Conclusion

In this article, you have learned how to perform two DataFrame joins on multiple columns in PySpark, and also learned how to use multiple conditions using join(), where(), and SQL expression.

References

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium