In PySpark SQL, an inner join is used to combine rows from two or more tables based on a related column between them. The inner join selects rows from both tables where the specified condition is satisfied, meaning it only includes rows that have matching values in the specified column(s) from both tables.
For example, consider two tables: “emp
” and “dept
.” If you want to retrieve a list of employees along with their department information, you would use an inner join on the “dept_id
” column, which is common between the two tables. This would result in a new table containing only the rows where there is a match between the “dept_id
” column in the “emp
” table and the “dept_id
” column in the “dept
” table.
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkbyexamples.com").getOrCreate()
emp = [(1,"Smith",-1,"2018","10","M",3000), \
(2,"Rose",1,"2010","20","M",4000), \
(3,"Williams",1,"2010","10","M",1000), \
(4,"Jones",2,"2005","10","F",2000), \
(5,"Brown",2,"2010","40","",-1), \
(6,"Brown",2,"2010","50","",-1) \
]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
"dept_id","gender","salary"]
empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)
dept = [("Finance",10), \
("Marketing",20), \
("Sales",30), \
("IT",40) \
]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)
This prints emp DataFrame and dept DataFrame to the console.
# Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|dept_id |gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1 |Smith |-1 |2018 |10 |M |3000 |
|2 |Rose |1 |2010 |20 |M |4000 |
|3 |Williams|1 |2010 |10 |M |1000 |
|4 |Jones |2 |2005 |10 |F |2000 |
|5 |Brown |2 |2010 |40 | |-1 |
|6 |Brown |2 |2010 |50 | |-1 |
+------+--------+---------------+-----------+-----------+------+------+
# Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance |10 |
|Marketing|20 |
|Sales |30 |
|IT |40 |
+---------+-------+
PySpark DataFrame Inner Join Example
In PySpark, you can perform an inner join between two DataFrames using the join()
function and inner
as the join type.
# Inner join example
join_result = empDF.join(deptDF,"dept_id","inner")
join_result.show(false)
The resulting DataFrame join_result
will contain only the rows where the key column dept_id
exists in both empDf
and deptDF
.
When we perform an inner join on our datasets, it excludes the “dept_id” with value 60 from the “empDF” dataset and the “dept_id” with value 30 from the “deptDF” dataset. Below is the outcome of the join expression.
# Output:
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1 |Smith |-1 |2018 |10 |M |3000 |Finance |10 |
|2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 |
|3 |Williams|1 |2010 |10 |M |1000 |Finance |10 |
|4 |Jones |2 |2005 |10 |F |2000 |Finance |10 |
|5 |Brown |2 |2010 |40 | |-1 |IT |40 |
+------+--------+---------------+-----------+-----------+------
Inner Join with Different Column Names
If the columns you want to join on have different names in the two DataFrames, you can specify the column names explicitly in the join()
function. Here’s an example:
Suppose we have two DataFrames, emp
with column emp_dept_id
and dept
DataFrame with column name dept_id
. We want to perform an inner join on these DataFrames.
# Join with different column names
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner")
.show(false)
PySpark SQL Inner Join
To run an SQL query in PySpark, first, create the temporary table from the DataFrame and run the query using the spark.sql()
method.
# Run SQL Qeuery
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
joinDF2 = spark.sql("SELECT e.* FROM EMP e INNER JOIN DEPT d ON e.dept_id == d.dept_id") \
.show(truncate=False)
This also returns the same output as above.
Conclusion
In summary, an inner join in PySpark SQL combines rows from two or more tables based on matching values in a specified column, producing a result set that contains only the matched rows from both tables. PySpark allows us to combine datasets based on matching values in columns that have different names.
Hope you Like it !!
Related Articles
- PySpark SQL Left Outer Join Examples
- PySpark SQL Self Join Examples
- PySpark SQL Left Anti Join Examples
- PySpark SQL Full Outer Join with Example
- PySpark SQL Left Semi Join Example
- PySpark SQL Right Outer Join with Example
- PySpark SQL expr() (Expression ) Function
- PySpark SQL – Working with Unix Time | Timestamp
- PySpark SQL Types (DataType) with Examples
- PySpark SQL Self Join With Example