Spark internal execution plan is a set of operations executed to translate SQL query, DataFrame, and Dataset into the best possible optimized logical and physical plan. It determines the processing flow from the front end (Query) to the back end (Executors).
The execution plans allow you to understand how the code will actually get executed across a cluster and is useful for optimizing queries. Spark provides an EXPLAIN() API to look at the Spark execution plan for your Spark SQL query, DataFrame, and Dataset.
In this article, I will show you how to get the Spark query plan using the EXPLAIN() API so you can debug and analyze your Apache Spark application.
Table of contents
1. Spark End-to-End Execution Flow
Apache Spark or PySpark uses a Catalyst optimizer, which automatically discovers the most efficient Spark Execution Plan to execute the operations specified. It produces execution flow as follows below:
- The code written is first noted as an unresolved logical plan, if it is valid then Spark converts this into a logical plan
- The logical plan is passed through the Catalyst Optimizer to apply optimized rules.
- The Optimized Logical Plan is then converted into a Physical Plan
- The Physical Plan is executed by the Spark executors.
The different operations performed in the spark execution flow are
- Optimizing logic
- Physical planning
- Analyzing cost model
- Code Generation
Using these operations below Spark Execution plans are created
- Unresolved Logical Plan
- Resolved Logical plan
- Optimized Logical plan
- Physical plans
- Code model plans
Apache spark uses these operations and creates plans stated above to process the query in the most performative way and faster.
2. Understanding Spark Execution plan
In order to demonstrate spark Execution plans, we will try with Dataframes regardless they come from SQL or raw dataframe. And the function you will use is explain().
2.1. EXPLAIN() modes:
Before Apache Spark 3.0, there were only two modes available to format explain output.
- explain(extended=False) which projects only the physical plan
- explain(extended=True) which projects all the plans (logical and physical)
But from Apache Spark 3.0, you have a new parameter mode that produces the expected format for the plan:
- explain(mode=”simple”) – will display the physical plan
- explain(mode=”extended”) – will display physical and logical plans (like “extended” option)
- explain(mode=”codegen”) – will display the java code planned to be executed
- explain(mode=”cost”) – will display the optimized logical plan and related statistics (if they exist)
- explain(mode=”formatted”) – will display a split output composed of a nice physical plan outline and a section with each node details
2.2. Create DataFrame for demonstration
import spark.implicits._ val data1 = Seq(("1", "Java", "20000"), ("2", "Python", "100000"), ("3", "Scala", "3000")) //create languages DF val languages = spark.createDataFrame(data1) .toDF("id","language","tution_fees") //Create temporary view languages.createOrReplaceTempView("languages") val data2 = Seq(("1", "studentA"), ("1", "studentB"), ("2", "studentA"), ("3", "studentC")) //create students DF val students = spark.createDataFrame(data2).toDF("language_id","studentName") //create temporary view students.createOrReplaceTempView("students") // Join tables val df =spark.sql("""SELECT students.studentName, SUM(students.language_id) as c FROM students INNER JOIN languages ON students.language_id= languages.id WHERE students.studentName ='studentA' group by students.studentName""")
In the above snippet, we create a DataFrame(df) by joining
languages view, applied filter on students and get a count based on the language.
2.3. Unresolved Logical Plan
Once we create a DataFrame, In the first step it verifies the syntactic fields in the query, and next the semantic analysis is executed on top of it. The final result of this step is the logical plan. If the plan is unable to validate a table or column object it flags them as Unresolved.
Let’s explore the Unresolved Logical Plan using the scala command and setting
extended argument to True.
df.explain(extended=true) //output(First Plan) == Parsed Logical Plan == 'Aggregate ['students.studentName], ['students.studentName, 'SUM('students.language_id) AS c#1539] +- 'Join Inner, (('students.language_id = 'languages.id) AND ('students.studentName = studentA)) :- 'UnresolvedRelation [students], , false +- 'UnresolvedRelation [languages], , false
The above output of Parsed Logical Plan has validated everything and built the first version of the logical plan with the flow of execution (
It was unable to validate the join relationship between
languages and hence marked the relation operation as
2.4. Resolved Logical Plan
Schema Catalog to validate the table or column objects, the logical plan has now resolved everything it was unable to in the unresolved logical plan.
In this catalog, which can be connected to a metastore, a semantic analysis will be produced to verify data structures, schemas, types, etc. and if everything goes well, the plan is marked as “Analyzed Logical Plan”.
df.explain(extended=true) //output(Second Plan) == Analyzed Logical Plan == studentName: string, c: double Aggregate [studentName#1536], [studentName#1536, sum(cast(language_id#1535 as double)) AS c#1539] +- Join Inner, ((language_id#1535 = id#1525) AND (studentName#1536 = studentA)) :- SubqueryAlias students : +- View (`students`, [language_id#1535,studentName#1536]) : +- Project [_1#1531 AS language_id#1535, _2#1532 AS studentName#1536] : +- LocalRelation [_1#1531, _2#1532] +- SubqueryAlias languages +- View (`languages`, [id#1525,language#1526,tution_fees#1527]) +- Project [_1#1519 AS id#1525, _2#1520 AS language#1526, _3#1521 AS tution_fees#1527] +- LocalRelation [_1#1519, _2#1520, _3#1521]
You can see that the Join
Relation operation between students and languages that were previously marked as
UnresolvedRelation is now resolved, it has returned a
SubqueryAlias operations from the
spark_catalog which has determined the
2.5. Optimized Logical plan
Once the Resolved Logical plan has been produced, Catalyst optimizer helps to optimize the resolved logical plan using various rules applied on logical operations.
These logical operations will be reordered to optimize the logical plan based on the operations it needs to perform.
df.explain(extended=true) //output(Third Plan) == Optimized Logical Plan == Aggregate [studentName#1536], [studentName#1536, sum(cast(language_id#1535 as double)) AS c#1539] +- Project [language_id#1535, studentName#1536] +- Join Inner, (language_id#1535 = id#1525) :- LocalRelation [language_id#1535, studentName#1536] +- LocalRelation [id#1525]
We can see the operations have been reordered, the
WHERE is now applied directly on
student view and created a new Student object reducing the volume of data being processed by the join.
2.6. Physical plans
The final plan of spark execution explains output is the Physical Plan. From the optimized logical plan, a plan that describes how it will be physically executed on the cluster will be generated. The spark catalyst optimizer creates multiple physical plans and compares each of them through the Cost Model by comparing Execution time and Resource consumption and then selecting the best optimal plan as the
Final Physical Plan that runs on the executors.
df.explain(extended=true) //output(Last Plan) == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[studentName#1536], functions=[finalmerge_sum(merge sum#1544) AS sum(cast(language_id#1535 as double))#1540], output=[studentName#1536, c#1539]) +- Exchange hashpartitioning(studentName#1536, 200), ENSURE_REQUIREMENTS, [id=#1028] +- HashAggregate(keys=[studentName#1536], functions=[partial_sum(cast(language_id#1535 as double)) AS sum#1544], output=[studentName#1536, sum#1544]) +- Project [language_id#1535, studentName#1536] +- BroadcastHashJoin [language_id#1535], [id#1525], Inner, BuildRight, false :- LocalTableScan [language_id#1535, studentName#1536] +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, string, true]),false), [id=#1023] +- LocalTableScan [id#1525]
We can see from the above output the cluster operations, the TableScan, the BroadcastHashJoin and HashAggregate.
isFinalPlan is false because spark performs Adaptive Query Execution using the plan statics at run time.
3. Adaptive Query Execution
Adaptive Query Execution has a new feature in Spark 3.0 that enables spark execution physical plan changes at runtime of the query on the cluster. Based on the query plan execution statics, at runtime spark changes to the better plan.
By default, this feature is disabled therefore needs to be enabled using the Spark Configuration settings.
However this plan is not displayed in the output of the
EXPLAIN() functions, and so we will need to explore the Spark UI and track the changes.
3.1 AQE in Spark-UI
In the output of the EXPLAIN() method, it always mentions as this physical plan is not the final plan. Only in spark UI we can see the final plan due to Adoptive Query Execution turned ON, it finalizes the plan on the fly based on the execution statistics.
Spark internal execution plan is very helpful to get insights on how a query execution flows from front-end to back-end i.e to the cluster. It is also useful for optimizing query performance.
EXPLAIN() method is very useful to extract the Spark Execution Flow and Adoptive Execution Query feature in spark 3.0 helps to improve the execution plans, by creating better plans during runtime using real-time statistics.