Spark 3.0 – Adaptive Query Execution with Example

Adaptive Query Execution (AQE) is one of the greatest features of Spark 3.0 which reoptimizes and adjusts query plans based on runtime statistics collected during the execution of the query.

In this article, I will explain what is Adaptive Query Execution, Why it has become so popular, and will see how it improves performance with Scala & PySpark examples.

How Query Executes Prior to Spark 3.0

Prior to 3.0, Spark does the single-pass optimization by creating an execution plan (set of rules) before the query starts executing, once execution starts it sticks with the plan and starts executing the rules it created in the plan and doesn’t do any further optimization which is based on the metrics it collects during each stage.

When we submit a query, DataFrame, or Dataset operations, Spark does the following in order.

Spark Adaptive Query Execution
Source: Databricks
  1. First, Spark parses the query and creates the Unresolved Logical Plan
    • Validates the syntax of the query.
    • Doesn’t validate the semantics meaning column name existence, data types.
  2. Analysis: Using the Catalyst, it converts the Unresolved Logical Plan to Resolved Logical Plan a.k.a Logical Plan.
    1. The catalog contains the column names and data types, during this step, it validates the columns mentioned in a query with catalog.
  3. Optimization: Converts Logical Plan into Optimized Logical Plan.
  4. Planner: Now it creates One or More Physical Plans from an optimized Logical plan.
  5. Cost Model: In this phase, calculates the cost for each Physical plan and select the Best Physical Plan.
  6. RDD Generation: RDD’s are generated, this is the final phase of query optimization which generates RDD in Java bytecode.

Once RDD’s are generated in Byte code, the Spark execution engine executes the transformations and action.

What is Adaptive Query Execution

Adaptive Query Optimization in Spark 3.0, reoptimizes and adjusts query plans based on runtime metrics collected during the execution of the query, this re-optimization of the execution plan happens after each stage of the query as stage gives the right place to do re-optimization.

Note: In a Spark job, Stage is created with each wider transformation where data shuffle happens.

How it Evolved?

With each major release of Spark, it’s been introducing a new optimization features in order to better execute the query to achieve the greater performance.

  • Spark 1.x – Introduced Catalyst Optimizer and Tungsten Execution Engine
  • Spark 2.x – Added Cost-Based Optimizer 
  • Spark 3.0 – Now added Adaptive Query Execution

Enabling Adaptive Query Execution

Adaptive Query Execution is disabled by default. In order to enable set spark.sql.adaptive.enabled configuration property to true. Besides this property, you also need to enable the AQE feature you going to use that are explained later in the section.


spark.conf.set("spark.sql.adaptive.enabled",true)

After enabling Adaptive Query Execution, Spark performs Logical Optimization, Physical Planning, and Cost model to pick the best physical. By doing the re-plan with each Stage, Spark 3.0 performs 2x improvement on TPC-DS over Spark 2.4. 

Spark Adaptive Query Execution Performance
ADQ performance comparison (Source: Databricks)

Spark SQL UI

Since the execution plan may change at the runtime after finishing the stage and before executing a new stage, the SQL UI should also reflect the changes.

After you enabled the AQE mode, and if the operations have Aggregation, Joins, Subqueries (wider transformations) the Spark Web UI shows the original execution plan at the beginning. When adaptive execution starts, each Query Stage submits the child stages and probably changes the execution plan in it.

Adaptive Query Execution Features

Spark 3.0 comes with three major features in AQE.

  • Reducing Post-shuffle Partitions.
  • Switching Join Strategies to broadcast Join
  • Optimizing Skew Join

Reducing Post-shuffle Partitions.

Prior to 3.0, the developer needs to know the data as Spark doesn’t provide the optimal partitions after each shuffle operation and the developer needs to re-partition to increase or coalesce to decrease the partitions based on the total number of records.

With Spark 3.0, after every stage of the job, Spark dynamically determines the optimal number of partitions by looking at the metrics of the completed stage. In order to use this, you need to enable the below configuration.


spark.conf.set("spark.sql.adaptive.enabled",true)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled",true)

Now, let’s see this in action, first without enabling AQE.


  import spark.implicits._
  val simpleData = Seq(("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  )
  val df = simpleData.toDF("employee_name","department","state","salary","age","bonus")
  
  val df1=df.groupBy("department").count()
  println(df1.rdd.getNumPartitions)

Since groupBy() triggers the wider transformation or shuffle, statement df1.rdd.getNumPartitions results in 200 partitions; This is because spark by default creates 200 partitions for shuffle operations.

Now let’s run the same example after enabling AQE


  spark.conf.set("spark.sql.adaptive.enabled",true)
  val df2=df.groupBy("department").count()
  println(df2.rdd.getNumPartitions)

This results in 7 Partitions on my system, you might see this number differently due to resource differences between mine and your system.

With this feature, developers don’t have to know the size of the data and do the re-partition post shuffle operations base on the data. Spark takes care of this hereafter.

Switching Join Strategies to Broadcast Join

Among all different Join strategies available in Spark, broadcast hash join gives a greater performance. This strategy can be used only when one of the joins tables small enough to fit in memory within the broadcast threshold.

When one of the join tables could fit in memory before or after filtering data, AQE replans the join strategy at runtime and uses broadcast hash join.

Optimizing Skew Join

Sometimes we may come across data in partitions that are not evenly distributed, this is called Data Skew. Operations such as join perform very slow on this partitions. By enabling the AQE, Spark checks the stage statistics and determines if there are any Skew joins and optimizes it by splitting the bigger partitions into smaller (matching partition size on other table/dataframe).


spark.conf.set("spark.sql.adaptive.enabled",true)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled",true)

Make sure, you set both configuration property to true.

Reference

Spark Adaptive Query Execution

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply

This Post Has One Comment

  1. Anonymous

    very well explained.

You are currently viewing Spark 3.0 – Adaptive Query Execution with Example