Spark SQL Performance Tuning by Configurations

Spark provides many configurations to improving and tuning the performance of the Spark SQL workload, these can be done programmatically or you can apply at a global level using Spark submit.

Related: Improve the performance using programming best practices

In my last article on performance tuning, I’ve explained some guidelines to improve the performance using programming. In this article, I will explain some of the configurations that I’ve used or read in several blogs in order to improve or tuning the performance of the Spark SQL queries and applications.

You can also set all configurations explained here with the --conf option of the spark-submit command.


spark-submit --conf "key=value" \
             --conf "key=value"

Use Columnar format when Caching

When you are caching data from Dataframe/SQL, use the in-memory columnar format. When you perform Dataframe/SQL operations on columns, Spark retrieves only required columns which result in fewer data retrieval and less memory usage.

You can enable Spark to use in-memory columnar storage by setting spark.sql.inMemoryColumnarStorage.compressed configuration to true.


spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", true)

Spark Cost-Based Optimizer

When you are working with multiple joins, use Cost-based Optimizer as it improves the query plan based on the table and columns statistics.

This is enabled by default, In case if this is disabled, you can enable it by setting spark.sql.cbo.enabled to true


spark.conf.set("spark.sql.cbo.enabled", true)

Note: Prior to your Join query, you need to run ANALYZE TABLE command by mentioning all columns you are joining. This command collects the statistics for tables and columns for a cost-based optimizer to find out the best query plan.


ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS col1,col2

Use Optimal value for Shuffle Partitions

When you perform an operation that triggers data shuffle (like Aggregat’s and Joins), Spark by default creates 200 partitions. This is because of spark.sql.shuffle.partitions configuration property set to 200.

This 200 default value is set because Spark doesn’t know the optimal partition size to use, post shuffle operation. Most of the times this value will cause performance issues hence, change it based on the data size. If you have huge data then you need to have higher number and if you have smaller dataset have it lower number.


spark.conf.set("spark.sql.shuffle.partitions",30) //Default value is 200

You need to tune this value along with others until you reach your performance baseline.

Use Broadcast Join when your Join data can fit in memory

Among all different Join strategies available in Spark, broadcast hash join gives a greater performance. This strategy can be used only when one of the joins tables small enough to fit in memory within the broadcast threshold.

You can increse the threshold size if your data is big and use the below configuration to do so.


spark.conf.set("spark.sql.autoBroadcastJoinThreshold",10485760) //100 MB by default

Spark 3.0 – Using coalesce & repartition on SQL

While working with Spark SQL query, you can use the COALESCE, REPARTITION and REPARTITION_BY_RANGE within the query to increase and decrease the partitions based on your data size.


SELECT /*+ COALESCE(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(c) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3, dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(3, dept_col) */ * FROM EMP_TABLE

Spark 3.0 – Enable Adaptive Query Execution –

Adaptive Query execution is a feature from 3.0 which improves the query performance by re-optimizing the query plan during runtime with the statistics it collects after each stage completion.

You can enable this by setting spark.sql.adaptive.enabled configuration property to true.


spark.conf.set("spark.sql.adaptive.enabled",true)

Spark 3.0 – Coalescing Post Shuffle Partitions

With Spark 3.0, after every stage of the job, Spark dynamically determines the optimal number of partitions by looking at the metrics of the completed stage. In order to use this, you need to enable the below configuration.


spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled",true)

Spark 3.0 – Optimizing Skew Join

Sometimes we may come across data in partitions that are not evenly distributed, this is called Data Skew. Operations such as join perform very slow on this partitions. By enabling the AQE, Spark checks the stage statistics and determines if there are any Skew joins and optimizes it by splitting the bigger partitions into smaller (matching partition size on other table/dataframe).


spark.conf.set("spark.sql.adaptive.skewJoin.enabled",true)

Conclusion

In this Spark SQL Performance tuning and optimization article, you have learned different configurations to improve the performance of the Spark SQL query and application. Where applies, you need to tune the values of these configurations along with executor CPU cores and executor memory until you meet your needs.

Happy Learning !!

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply

You are currently viewing Spark SQL Performance Tuning by Configurations