Spark Internal Execution plan

Spark internal execution plan is a set of operations executed to translate SQL query, DataFrame, and Dataset into the best possible optimized logical and physical plan. It determines the processing flow from the front end (Query) to the back end (Executors).

The execution plans allow you to understand how the code will actually get executed across a cluster and is useful for optimizing queries. Spark provides an EXPLAIN() API to look at the Spark execution plan for your Spark SQL query, DataFrame, and Dataset.

In this article, I will show you how to get the Spark query plan using the EXPLAIN() API so you can debug and analyze your Apache Spark application.

1. Spark End-to-End Execution Flow

Apache Spark or PySpark uses a Catalyst optimizer, which automatically discovers the most efficient Spark Execution Plan to execute the operations specified. It produces execution flow as follows below:

  • The code written is first noted as an unresolved logical plan, if it is valid then Spark converts this into a logical plan
  • The logical plan is passed through the Catalyst Optimizer to apply optimized rules.
  • The Optimized Logical Plan is then converted into a Physical Plan
  • The Physical Plan is executed by the Spark executors.
Spark Internal Execution Flow
Spark Internal Execution Flow

The different operations performed in the spark execution flow are

  • Analysis
  • Optimizing logic
  • Physical planning
  • Analyzing cost model
  • Code Generation

Using these operations below Spark Execution plans are created

  • Unresolved Logical Plan
  • Resolved Logical plan
  • Optimized Logical plan
  • Physical plans
  • Code model plans

Apache spark uses these operations and creates plans stated above to process the query in the most performative way and faster.

2. Understanding Spark Execution plan

In order to demonstrate spark Execution plans, we will try with Dataframes regardless they come from SQL or raw dataframe. And the function you will use is explain().

2.1. EXPLAIN() modes:

Before Apache Spark 3.0, there were only two modes available to format explain output.

  • explain(extended=False) which projects only the physical plan
  • explain(extended=True) which projects all the plans (logical and physical)

But from Apache Spark 3.0, you have a new parameter mode that produces the expected format for the plan:

  • explain(mode=”simple”) – will display the physical plan
  • explain(mode=”extended”) – will display physical and logical plans (like “extended” option)
  • explain(mode=”codegen”) – will display the java code planned to be executed
  • explain(mode=”cost”) – will display the optimized logical plan and related statistics (if they exist)
  • explain(mode=”formatted”) – will display a split output composed of a nice physical plan outline and a section with each node details

2.2. Create DataFrame for demonstration

To demonstrate execution plans, lets create a dataframe, and apply some filtering and aggregations.


import spark.implicits._
val data1 = Seq(("1", "Java", "20000"), 
                ("2", "Python", "100000"), 
                ("3", "Scala", "3000"))
//create languages DF
val languages = spark.createDataFrame(data1)
                     .toDF("id","language","tution_fees")
//Create temporary view
languages.createOrReplaceTempView("languages")


val data2 = Seq(("1", "studentA"), ("1", "studentB"), 
                ("2", "studentA"), ("3", "studentC"))

//create students DF
val students = spark.createDataFrame(data2).toDF("language_id","studentName")

//create temporary view
students.createOrReplaceTempView("students")

// Join tables
val df =spark.sql("""SELECT students.studentName, SUM(students.language_id) as c 
         FROM students 
         INNER JOIN languages 
         ON students.language_id= languages.id 
         WHERE students.studentName ='studentA' 
         group by students.studentName""")

In the above snippet, we create a DataFrame(df) by joining students and languages view, applied filter on students and get a count based on the language.

2.3. Unresolved Logical Plan

Once we create a DataFrame, In the first step it verifies the syntactic fields in the query, and next the semantic analysis is executed on top of it. The final result of this step is the logical plan. If the plan is unable to validate a table or column object it flags them as Unresolved.

Let’s explore the Unresolved Logical Plan using the scala command and setting extended argument to True.


df.explain(extended=true)

//output(First Plan)
== Parsed Logical Plan ==
'Aggregate ['students.studentName], ['students.studentName, 'SUM('students.language_id) AS c#1539]
+- 'Join Inner, (('students.language_id = 'languages.id) AND ('students.studentName = studentA))
   :- 'UnresolvedRelation [students], [], false
   +- 'UnresolvedRelation [languages], [], false

The above output of Parsed Logical Plan has validated everything and built the first version of the logical plan with the flow of execution (Aggregate and Inner Join operations).

It was unable to validate the join relationship between students and languages and hence marked the relation operation as UnresolvedRelation.

2.4. Resolved Logical Plan

Using the Schema Catalog to validate the table or column objects, the logical plan has now resolved everything it was unable to in the unresolved logical plan.

In this catalog, which can be connected to a metastore, a semantic analysis will be produced to verify data structures, schemas, types, etc. and if everything goes well, the plan is marked as “Analyzed Logical Plan”.


df.explain(extended=true)

//output(Second Plan)
== Analyzed Logical Plan ==
studentName: string, c: double
Aggregate [studentName#1536], [studentName#1536, sum(cast(language_id#1535 as double)) AS c#1539]
+- Join Inner, ((language_id#1535 = id#1525) AND (studentName#1536 = studentA))
   :- SubqueryAlias students
   :  +- View (`students`, [language_id#1535,studentName#1536])
   :     +- Project [_1#1531 AS language_id#1535, _2#1532 AS studentName#1536]
   :        +- LocalRelation [_1#1531, _2#1532]
   +- SubqueryAlias languages
      +- View (`languages`, [id#1525,language#1526,tution_fees#1527])
         +- Project [_1#1519 AS id#1525, _2#1520 AS language#1526, _3#1521 AS tution_fees#1527]
            +- LocalRelation [_1#1519, _2#1520, _3#1521]

You can see that the Join Relation operation between students and languages that were previously marked as UnresolvedRelation is now resolved, it has returned a SubqueryAlias operations from the spark_catalog which has determined the Join relationship.

2.5. Optimized Logical plan

Once the Resolved Logical plan has been produced, Catalyst optimizer helps to optimize the resolved logical plan using various rules applied on logical operations.

These logical operations will be reordered to optimize the logical plan based on the operations it needs to perform.


df.explain(extended=true)

//output(Third Plan)
== Optimized Logical Plan ==
Aggregate [studentName#1536], [studentName#1536, sum(cast(language_id#1535 as double)) AS c#1539]
+- Project [language_id#1535, studentName#1536]
   +- Join Inner, (language_id#1535 = id#1525)
      :- LocalRelation [language_id#1535, studentName#1536]
      +- LocalRelation [id#1525]

We can see the operations have been reordered, the WHERE is now applied directly on student view and created a new Student object reducing the volume of data being processed by the join.

2.6. Physical plans

The final plan of spark execution explains output is the Physical Plan. From the optimized logical plan, a plan that describes how it will be physically executed on the cluster will be generated. The spark catalyst optimizer creates multiple physical plans and compares each of them through the Cost Model by comparing Execution time and Resource consumption and then selecting the best optimal plan as the Final Physical Plan that runs on the executors.


df.explain(extended=true)

//output(Last Plan)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[studentName#1536], functions=[finalmerge_sum(merge sum#1544) AS sum(cast(language_id#1535 as double))#1540], output=[studentName#1536, c#1539])
   +- Exchange hashpartitioning(studentName#1536, 200), ENSURE_REQUIREMENTS, [id=#1028]
      +- HashAggregate(keys=[studentName#1536], functions=[partial_sum(cast(language_id#1535 as double)) AS sum#1544], output=[studentName#1536, sum#1544])
         +- Project [language_id#1535, studentName#1536]
            +- BroadcastHashJoin [language_id#1535], [id#1525], Inner, BuildRight, false
               :- LocalTableScan [language_id#1535, studentName#1536]
               +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, string, true]),false), [id=#1023]
                  +- LocalTableScan [id#1525]

We can see from the above output the cluster operations, the TableScan, the BroadcastHashJoin and HashAggregate.

Note: The isFinalPlan is false because spark performs Adaptive Query Execution using the plan statics at run time.

3. Adaptive Query Execution

Adaptive Query Execution has a new feature in Spark 3.0 that enables spark execution physical plan changes at runtime of the query on the cluster. Based on the query plan execution statics, at runtime spark changes to the better plan.

By default, this feature is disabled therefore needs to be enabled using the Spark Configuration settings.


spark.conf.set("spark.sql.adaptive.enabled", "true")

However this plan is not displayed in the output of the EXPLAIN() functions, and so we will need to explore the Spark UI and track the changes.

3.1 AQE in Spark-UI

In the output of the EXPLAIN() method, it always mentions as this physical plan is not the final plan. Only in spark UI we can see the final plan due to Adoptive Query Execution turned ON, it finalizes the plan on the fly based on the execution statistics.

Spark Execution Flow
Spark Execution Flow

4. Conclusion

Spark internal execution plan is very helpful to get insights on how a query execution flows from front-end to back-end i.e to the cluster. It is also useful for optimizing query performance. EXPLAIN() method is very useful to extract the Spark Execution Flow and Adoptive Execution Query feature in spark 3.0 helps to improve the execution plans, by creating better plans during runtime using real-time statistics.

Related Articles

rimmalapudi

Data Engineer. I write about BigData Architecture, tools and techniques that are used to build Bigdata pipelines and other generic blogs.

Leave a Reply