Sparklyr join explained with examples

In Sparklyr, using the dplyr syntax or using the Spark SQL syntax we can perform joins. All dplyr join supports the R data frames as well as spark data frames. In this article, you will see different types of sparklyr joins with examples.

Joins are a fundamental operation in data analysis and data mining. They allow us to combine two or more datasets based on a common key. Moreover, they are used in a wide variety of applications, such as data cleaning, data integration, and data mining.

It supports all basic traditional join operations available in SQL.

Using sparklyr and dplyr within R, we can easily import our data and join these tables, using the following join types:

1. Spaklyr join types

sparklyr/dplyr joinEquivalent SQL Join
inner_joinINNER JOIN
left_joinLEFT JOIN
right_joinRIGHT JOIN
full_joinFULL OUTER JOIN
semi_join
anti_join
sparklyr join types

Before we jump into Sparklyr SQL Join examples, first, let’s create an emp and dept DataFrames. here, column emp_id is unique on emp and dept_id is unique on the dept dataset’s and emp_dept_id from emp has a reference to dept_id on dept dataset.


# Load the sparklyr package
library(sparklyr)

# Connect to a local Spark cluster
sc <- spark_connect(master = "local")

# Define the employee data as a R data frame
emp <- data.frame(emp_id = c(1, 2, 3, 4, 5, 6),
                  name = c("Smith", "Rose", "Williams", "Jones", "Brown", "Brown"),
                  superior_emp_id = c(-1, 1, 1, 2, 2, 2),
                  year_joined = c("2018", "2010", "2010", "2005", "2010", "2010"),
                  dept_id = c("10", "20", "10", "10", "40", "50"),
                  gender = c("M", "M", "M", "F", "", ""),
                  salary = c(3000, 4000, 1000, 2000, -1, -1))

# Create a Spark DataFrame from the employee data frame
empDF <- copy_to(sc, emp, "empDF")

# Define the department data as a R data frame
dept <- data.frame(dept_name = c("Finance", "Marketing", "Sales", "IT"),
                   dept_id = c(10, 20, 30, 40))

# Create a Spark DataFrame from the department data frame
deptDF <- copy_to(sc, dept, "deptDF")

# Shows the employee dataframe
show(empDF)

# Shows the department dataframe
show(deptDF)

This prints emp and dept DataFrame to the console. Refer complete example below on how to create spark object.


# Output:
> show(empDF)
# Source: spark [?? x 7]
  emp_id name     superior_emp_id year_joined dept_id    gender salary
                                    
1      1 Smith                 -1 2018        10          "M"      3000
2      2 Rose                   1 2010        20          "M"      4000
3      3 Williams               1 2010        10          "M"      1000
4      4 Jones                  2 2005        10          "F"      2000
5      5 Brown                  2 2010        40          ""         -1
6      6 Brown                  2 2010        50          ""         -1

> show(deptDF)
# Source: spark [?? x 2]
  dept_name dept_id
         
1 Finance        10
2 Marketing      20
3 Sales          30
4 IT             40

3. Sparklyr inner join

Inner join is the most commonly used join in Sparklyr. This joins two datasets on key columns, where keys don’t match the rows get dropped from both datasets (emp & dept).


# empDF inner join with deptDF
inner_join(empDF, deptDF,by = "dept_id")

From our dataset, dept_id 50 doesn’t have a record on the dept dataset hence, this record contains null on dept columns (dept_name & dept_id). and dept_id 30 from dept dataset dropped from the results. Below is the result of the above sparklyr join example expression.

sparklyr inner join

4. Sparklyr left join

A left join is a type of join operation in which all the rows from the left table (i.e., the first table mentioned in the join) are retained, and only the matching rows from the right table are included in the result. If there is no matching row in the right table for a row in the left table, the result will contain null values for the columns of the right table.


# empDF left join with deptDF
left_join(empDF, deptDF,by = "dept_id")

From our example, dept_id 50 does not have a corresponding record on the dept data set, hence it introduced null on the dept_name column for it. Since there are 6 records in empDF, it maintains all rows from the left table and gets the column from the right table. The above expression yields the below output.

result of spaklyr left join

4. Sparklyr right join

A right join is a type of join operation in which all the rows from the right table (i.e., the second table mentioned in the join) are retained, and only the matching rows from the left table are included in the result. If there is no matching row in the left table for a row in the right table, the result will contain null values for the columns of the left table. Below is the code for the example of sparklyr right join.


# empDF right join with deptDF
right_join(empDF, deptDF,by = "dept_id")

From our example, dept_id 30 does not have a corresponding record on the emp data set, hence it introduced null on all the columns of the emp table. Since there are 3 records for dep_id 10, and one each for 20 and 40, it introduces all records as individual records and joins with the emp table. The above expression yields the below output.

result of sparklyr right join

5. Sparklyr full join

A full join, also known as a full outer join, is a type of join operation that includes all the rows from both tables being joined, as well as the matching rows from both tables. If there is no matching row in either table for a given row in the other table, null values will be included in the result for the columns from the missing table


# empDF full join with deptDF
full_join(empDF, deptDF,by = "dept_id")

From our example, dept_id 30 does not have a corresponding record on the emp data set, hence it introduced null on all the columns of the emp table. Moreover, dep_id 50 does not have a corresponding record in the dept table, so it introduces a null in the dep_name column. The above expression yields the below output.

6. Sparklyr semi join

A semi join is a type of join operation in which only the rows from the left table that have matching rows in the right table are retained in the result. The result only contains the columns from the left table.


# empDF semi join with deptDF
semi_join(empDF, deptDF,by = "dept_id")

From our example, dept_id 50 does not have a corresponding record on the dept data set, hence it does not contain any record in the resulting data frame. Below is the output for the above expression. Here, you can see the results only contain the left table’s columns.

result of sparklyr semi join

7. Sparklyr anti join

An anti join, also known as an anti-semi join, is a type of join operation in which only the rows from the left table that have no matching rows in the right table are retained in the result. The result only contains the columns from the left table.


# empDF anti join with deptDF
anti_join(empDF, deptDF,by = "dept_id")

From our example, dept_id 50 does not have a corresponding record on the dept data set. Therefore, the result contains the result of dept_id of 50 from the emp table.

result of sparklyr anti join

7. Sparklyr join using spark SQL

Since sparklyr support native SQL syntax, we can also write join operations after creating temporary tables on DataFrames and use these tables on sdf_sql()


# empDF inner join with deptDF using spark SQL
sql_string <- 'select * from EMPDF e INNER JOIN DEPTDF d ON e.dept_id == d.dept_id'
result_df <- sdf_sql(sc, sql_string)

The above code gives the same output as we have got using the inner_join()

8. Complete example of sparklyr join


# Load the sparklyr package
library(sparklyr)

# Connect to a local Spark cluster
sc <- spark_connect(master = "local")

# Define the employee data as a R data frame
emp <- data.frame(emp_id = c(1, 2, 3, 4, 5, 6),
                  name = c("Smith", "Rose", "Williams", "Jones", "Brown", "Brown"),
                  superior_emp_id = c(-1, 1, 1, 2, 2, 2),
                  year_joined = c("2018", "2010", "2010", "2005", "2010", "2010"),
                  dept_id = c("10", "20", "10", "10", "40", "50"),
                  gender = c("M", "M", "M", "F", "", ""),
                  salary = c(3000, 4000, 1000, 2000, -1, -1))

# Create a Spark DataFrame from the employee data frame
empDF <- copy_to(sc, emp, "empDF")

# Define the department data as a R data frame
dept <- data.frame(dept_name = c("Finance", "Marketing", "Sales", "IT"),
                   dept_id = c(10, 20, 30, 40))

# Create a Spark DataFrame from the department data frame
deptDF <- copy_to(sc, dept, "deptDF")

# Shows the employee dataframe
show(empDF)

# Shows the department dataframe
show(deptDF)

# empDF inner join with deptDF
inner_join(empDF, deptDF,by = "dept_id")

# empDF left join with deptDF
left_join(empDF, deptDF,by = "dept_id")

# empDF right join with deptDF
right_join(empDF, deptDF,by = "dept_id")

# empDF full join with deptDF
full_join(empDF, deptDF,by = "dept_id")

# empDF semi join with deptDF
semi_join(empDF, deptDF,by = "dept_id")

# empDF anti join with deptDF
anti_join(empDF, deptDF,by = "dept_id")

Conclusion

You have learned how to join two or more DataFrames using various join functions, syntax, and usage examples with Sparklyr (Spark with R). I would also recommend reading through Optimizing SQL Joins to know the performance impact on joins.

Reference

https://spark.rstudio.com/packages/sparklyr/latest/reference/join.tbl_spark.html

Leave a Reply