You are currently viewing PySpark SQL Inner Join Explained

In PySpark SQL, an inner join is used to combine rows from two or more tables based on a related column between them. The inner join selects rows from both tables where the specified condition is satisfied, meaning it only includes rows that have matching values in the specified column(s) from both tables.

Advertisements

For example, consider two tables: “emp” and “dept.” If you want to retrieve a list of employees along with their department information, you would use an inner join on the “dept_id” column, which is common between the two tables. This would result in a new table containing only the rows where there is a match between the “dept_id” column in the “emp” table and the “dept_id” column in the “dept” table.


import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkbyexamples.com").getOrCreate()

emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

This prints emp DataFrame and dept DataFrame to the console.


# Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|dept_id    |gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

# Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+

PySpark DataFrame Inner Join Example

In PySpark, you can perform an inner join between two DataFrames using the join() function and inner as the join type.


# Inner join example
join_result = empDF.join(deptDF,"dept_id","inner")
join_result.show(false)

The resulting DataFrame join_result will contain only the rows where the key column dept_id exists in both empDf and deptDF.

When we perform an inner join on our datasets, it excludes the “dept_id” with value 60 from the “empDF” dataset and the “dept_id” with value 30 from the “deptDF” dataset. Below is the outcome of the join expression.


# Output:
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------

Inner Join with Different Column Names

If the columns you want to join on have different names in the two DataFrames, you can specify the column names explicitly in the join() function. Here’s an example:

Suppose we have two DataFrames, emp with column emp_dept_id and dept DataFrame with column name dept_id. We want to perform an inner join on these DataFrames.


# Join with different column names
empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"inner")
    .show(false)

PySpark SQL Inner Join

To run an SQL query in PySpark, first, create the temporary table from the DataFrame and run the query using the spark.sql() method.


# Run SQL Qeuery
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

joinDF2 = spark.sql("SELECT e.* FROM EMP e INNER JOIN DEPT d ON e.dept_id == d.dept_id") \
  .show(truncate=False)

This also returns the same output as above.

Conclusion

In summary, an inner join in PySpark SQL combines rows from two or more tables based on matching values in a specified column, producing a result set that contains only the matched rows from both tables. PySpark allows us to combine datasets based on matching values in columns that have different names.

Hope you Like it !!

Reference;

Leave a Reply