Spark provides many configurations to improving and tuning the performance of the Spark SQL workload, these can be done programmatically or you can apply at a global level using Spark submit.
Related: Improve the performance using programming best practices
In my last article on performance tuning, I’ve explained some guidelines to improve the performance using programming. In this article, I will explain some of the configurations that I’ve used or read in several blogs in order to improve or tuning the performance of the Spark SQL queries and applications.
You can also set all configurations explained here with the --conf
option of the spark-submit
command.
spark-submit --conf "key=value" \
--conf "key=value"
Use Columnar format when Caching
When you are caching data from Dataframe/SQL, use the in-memory columnar format. When you perform Dataframe/SQL operations on columns, Spark retrieves only required columns which result in fewer data retrieval and less memory usage.
You can enable Spark to use in-memory columnar storage by setting spark.sql.inMemoryColumnarStorage.compressed
configuration to true
.
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", true)
Spark Cost-Based Optimizer
When you are working with multiple joins, use Cost-based Optimizer as it improves the query plan based on the table and columns statistics.
This is enabled by default, In case if this is disabled, you can enable it by setting spark.sql.cbo.enabled
to true
spark.conf.set("spark.sql.cbo.enabled", true)
Note: Prior to your Join query, you need to run ANALYZE TABLE command by mentioning all columns you are joining. This command collects the statistics for tables and columns for a cost-based optimizer to find out the best query plan.
ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS col1,col2
Use Optimal value for Shuffle Partitions
When you perform an operation that triggers data shuffle (like Aggregat’s and Joins), Spark by default creates 200 partitions. This is because of spark.sql.shuffle.partitions
configuration property set to 200
.
This 200 default value is set because Spark doesn’t know the optimal partition size to use, post shuffle operation. Most of the times this value will cause performance issues hence, change it based on the data size. If you have huge data then you need to have higher number and if you have smaller dataset have it lower number.
spark.conf.set("spark.sql.shuffle.partitions",30) //Default value is 200
You need to tune this value along with others until you reach your performance baseline.
Use Broadcast Join when your Join data can fit in memory
Among all different Join strategies available in Spark, broadcast hash join gives a greater performance. This strategy can be used only when one of the joins tables small enough to fit in memory within the broadcast threshold.
You can increse the threshold size if your data is big and use the below configuration to do so.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",10485760) //100 MB by default
Spark 3.0 – Using coalesce & repartition on SQL
While working with Spark SQL query, you can use the COALESCE
, REPARTITION
and REPARTITION_BY_RANGE
within the query to increase and decrease the partitions based on your data size.
SELECT /*+ COALESCE(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(c) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3, dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(3, dept_col) */ * FROM EMP_TABLE
Spark 3.0 – Enable Adaptive Query Execution –
Adaptive Query execution is a feature from 3.0 which improves the query performance by re-optimizing the query plan during runtime with the statistics it collects after each stage completion.
You can enable this by setting spark.sql.adaptive.enabled
configuration property to true
.
spark.conf.set("spark.sql.adaptive.enabled",true)
Spark 3.0 – Coalescing Post Shuffle Partitions
With Spark 3.0, after every stage of the job, Spark dynamically determines the optimal number of partitions by looking at the metrics of the completed stage. In order to use this, you need to enable the below configuration.
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled",true)
Spark 3.0 – Optimizing Skew Join
Sometimes we may come across data in partitions that are not evenly distributed, this is called Data Skew. Operations such as join perform very slow on this partitions. By enabling the AQE, Spark checks the stage statistics and determines if there are any Skew joins and optimizes it by splitting the bigger partitions into smaller (matching partition size on other table/dataframe).
spark.conf.set("spark.sql.adaptive.skewJoin.enabled",true)
Conclusion
In this Spark SQL Performance tuning and optimization article, you have learned different configurations to improve the performance of the Spark SQL query and application. Where applies, you need to tune the values of these configurations along with executor CPU cores and executor memory until you meet your needs.
Happy Learning !!