What is Broadcast Join in Spark and how does it work? Broadcast join is an optimization technique in the Spark SQL engine that is used to join two DataFrames. This technique is ideal for joining a large DataFrame with a smaller one. Traditional joins take longer as they require more data shuffling and data is always collected at the driver.
In this article, I will explain what is Broadcast Join, its application, and analyze its physical plan.
1. Spark Broadcast Join
As you know Spark splits the data into different nodes for parallel processing, when you have two DataFrames, the data from both are distributed across multiple nodes in the cluster so, when you perform traditional join, Spark is required to shuffle the data. Shuffle is needed as the data for each joining key may not colocate on the same node and to perform join the data for each key should be brought together on the same node. Hence, the traditional join is a very expensive operation in Spark.
Spark Broadcast Join is an important part of the Spark SQL execution engine, With broadcast join, Spark broadcast the smaller DataFrame to all executors and the executor keeps this DataFrame in memory and the larger DataFrame is split and distributed across all executors so that Spark can perform a join without shuffling any data from the larger DataFrame as the data required for join colocated on every executor.
Note: In order to use Broadcast Join, the smaller DataFrame should be able to fit in Spark Drivers and Executors memory. If the DataFrame can’t fit in memory you will be getting out-of-memory errors. You can also increase the size of the broadcast join threshold using some properties which I will be discussing later.
2. Types of Broadcast join.
There are two types of broadcast joins.
- Broadcast hash joins: In this case, the driver builds the in-memory hash DataFrame to distribute it to the executors.
- Broadcast nested loop join: It is a nested for-loop join. It is very good for non-equi joins or coalescing joins.
3. Configuring Spark Auto Broadcast join.
We can provide the max size of DataFrame as a threshold for automatic broadcast join detection in Spark. This can be set up by using
autoBroadcastJoinThreshold configuration in Spark SQL conf. Its value purely depends on the executor’s memory.
//Enable broadcast Join and //Set Threshold limit of size in bytes of a dataFrame to broadcast spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600) //Disable broadcast Join. spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
The threshold value for broadcast DataFrame is passed in bytes and can also be disabled by setting up its value as -1.
4. Example of a Broadcast Join
For our demo purpose, let us create two DataFrames of one large and one small using Databricks. Here we are creating the larger DataFrame from the dataset available in Databricks and a smaller one manually.
//Create a Larger DataFrame using weather Dataset in Databricks val largeDF = Spark.read .option("header",true) .option("inferschema",true) .parquet("dbfs:/mnt/training/weather/StationData/stationData.parquet") .limit(2000) //Create a smaller dataFrame with abbreviation of codes val smallerDF = Seq(("C", "Celcius"), ("F", "Fahrenheit") ).toDF("code", "realUnit")
Now let’s broadcast the
smallerDF and join it with
largerDF and see the result.
// Perform broadast join import org.apache.spark.sql.functions.broadcast largeDF.join( broadcast(smallerDF), smallerDF("code") largeDf("UNIT") ).show()
5. Analyze Broadcast Join
We can use the
EXPLAIN() method to analyze how the Spark broadcast join is physically implemented in the backend.
// Explain broadcast join import org.apache.spark.sql.functions.broadcast largeDF.join( broadcast(smallerDF), smallerDF("code") largeDF("UNIT") ).explain(extended=false)
The parameter “extended=false” to the
EXPLAIN() method results in the physical plan that gets executed on the Spark executors.
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [coalesce(UNIT#543, ), isnull(UNIT#543)], [coalesce(code#560, ), isnull(code#560)], Inner, BuildRight, false :- GlobalLimit 2000 : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#313] : +- LocalLimit 2000 : +- FileScan parquet [NAME#537,STATION#538,LATITUDE#539,LONGITUDE#540,ELEVATION#541,DATE#542,UNIT#543,TAVG#544] Batched: true, DataFilters: , Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/mnt/training/weather/StationData/stationData.parquet], PartitionFilters: , PushedFilters: , ReadSchema: struct<NAME:string,STATION:string,LATITUDE:float,LONGITUDE:float,ELEVATION:float,DATE:date,UNIT:s... +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(coalesce(input[0, string, true], ), isnull(input[0, string, true])),false), [id=#316] +- LocalTableScan [code#560, realUnit#561]
Notice how the physical plan is created by the Spark in the above example.
- First, It read the parquet file and created a Larger DataFrame with limited records.
BroadcastHashJoinis performed between the smallerDF and LargerDF using the condition provided.
- Even if the smallerDF is not specified to be broadcasted in our code, Spark automatically broadcasts the smaller DataFrame into executor memory by default.
- Spark Broadcast joins cannot be used when joining two large DataFrames.
- Broadcast join naturally handles data skewness as there is very minimal shuffling.
- The limitation of broadcast join is that we have to make sure the size of the smaller DataFrame gets fits into the executor memory.