Spark 3.0 released with a list of new features that includes performance improvement using ADQ, reading Binary files, improved support for SQL and Python, Python 3.0, Hadoop 3 compatibility, ACID support to name a few.
In this article, I will try to cover a few features along with Spark examples where possible. This is my first article on Spark 3.0 features and will be publishing more in this series.
1. Spark 3.0 Features list covered in this article.
- Adaptive Query execution also called AQE
- Language Version Upgrades
- New UI for Structure Streaming
- Datasource to Read Binary Files
- Support Recursive folders
- Support Multi char delimiter (||)
- New Spark build-in functions
- Switch to Proleptic Gregorian calendar
- DataFrame Tail
- Added repartition to SQL queries
- Better ANSI SQL compatible
2. Adaptive Query Execution
Adaptive Query Execution (AQE) is one of the greatest features of Spark 3.0 which reoptimizes and adjusts query plans based on runtime statistics collected during the execution of the query.
Prior to 3.0, Spark does the optimization by creating an execution plan before the query starts executing, once execution starts Spark doesn’t do any further optimization which is based on metrics it collects during runtime. AQE bridges this gap by applying the second level of optimization based on the metrics it sees with each stage.
Adaptive Query Execution is disabled by default. In order to enable Set spark.sql.adaptive.enabled
configuration property to true
.
After enabling Adaptive Query Execution, Spark performs 2x improvement on TPC-DS over Spark 2.4
Spark 3.0 comes with three major features in AQE.
- Coalescing Post-shuffle Partitions that dynamically determine the optimal number of partitions
- Converting sort-merge join to Broadcast join, and
- Skew Join Optimization
Adaptive Query execution needs it’s own topic, hance I’ve created another article explaining AQE and it’s features in detail.
3. Languages Version Upgrades
Spark 3.0 released with upgrades to languages it supports. Now Spark supports following language versions.
- Python3 (Python 2.x)
- Scala 2.12
- JDK 11
Besides the language it has upgraded Hadoop to version 3 and Kafka to 2.4.1 and many more.
4. New UI for Structured Streaming
Spark 3.0 provides a new Structured Streaming tab to Spark Web UI in order to monitor Structured streaming applications. This tab provides the run ID, Status, Start Time, Duration for each micro-batch along with runtime statistics. This helps the developer to debug and understand what’s happening with the streaming queries. It has two sections.
- Active Streaming Queries
- Completed Streaming Queries
Active Streaming queries list all RUNNING queries and Completed Streaming queries lists all FAILED and FINISHED queries.
Select Run ID which provides the detailed statistics of Streaming queries such as Input Rate, Process Rate, Input Rows, Batch Duration, and Operation Duration. You can find examples on how to troubleshoot the Structured Streaming at Databricks.
5. Datasource to Read Binary Files
Spark 3.0 supports a data source format “binaryFile” to read Binary files, Using binaryFile
data source, DataFrameReader reads files like image, pdf, zip, gzip, tar, and many binary files into DataFrame, each file will be read as a single record into Dataframe along with the metadata of the file.
val df = spark.read.format("binaryFile").load("/tmp/binary/spark.png")
df.printSchema()
df.show()
Yields below output.
// Output:
root
|-- path: string (nullable = true)
|-- modificationTime: timestamp (nullable = true)
|-- length: long (nullable = true)
|-- content: binary (nullable = true)
+--------------------+--------------------+------+--------------------+
| path| modificationTime|length| content|
+--------------------+--------------------+------+--------------------+
|file:/C:/tmp/bina...|2020-07-25 10:11:...| 74675|[89 50 4E 47 0D 0...|
+--------------------+--------------------+------+--------------------+
6. Feature to Read Recursive folders
Spark 3.0 added a feature recursiveFileLookup
option to read or load files from recursive subfolders, by setting the value to true
to this option, DataFrameReader
recursively loads files by looping through all folders and subfolders from a specified path.
// Feature to Read Recursive folders
spark.read.option("recursiveFileLookup", "true").csv("/path/to/folder")
7. Multiple Character Delimiter Support
Now with 3.0, Spark supports multiple character delimiter (||) while reading and writing CSV files. For example, if you have a CSV file with the following content.
// Multiple Character Delimiter Support
col1||col2||col3||col4
val1||val2||val3||val4
val1||val2||val3||val4
This can be read using the following example.
val df = spark.read
.option("delimiter","||")
.option("header","true")
.csv("/tmp/data/douplepipedata.csv")
Spark 2.x, doesn’t support multiple character delimiter hence, we used to convert from double delimiter to single delimiter using a pre-processor. Trying to read directly without pre-processing results in the following error.
throws java.lang.IllegalArgumentException: Delimiter cannot be more than one character: ||
8. New Spark Built-in Functions
Spark SQL already has hundreds of built-in functions, Spark 3.0 adds additional functions to the growing list. soon, I will write a separate article explaining all these functions.
// New Spark Built-in Functions
sinh,cosh,tanh,asinh,acosh,atanh,any,bit_and,bit_or,bit_count,bit_xor,
bool_and,bool_or,count_if,date_part,extract,forall,from_csv,
make_date,make_interval,make_timestamp,map_entries
map_filter,map_zip_with,max_by,min_by,schema_of_csv,to_csv
transform_keys,transform_values,typeof,version
xxhash64
9. Switch to Proleptic Gregorian calendar
Earlier version of Spark supports Dates in Julian and Gregorian calendar: For dates before 1582, the Julian calendar was used, for dates, after 1582 the Gregorian calendar was used. This is similar to Dates in JDK 7 and before which uses java.sql.Date
API. From JDK 8, a new Proleptic Gregorian calendar has been introduced with java.time.LocalDate
API.
Spark 3.0 adopts the Proleptic Gregorian calendar, which is already being used by other data systems like pandas, R, and Apache Arrow. All Date & Timestamp functions Spark provides prior to 3.0 behaves the same ways if your dates are after October 15, 1582. and your results might be different for dates prior to October 15, 1582.
Along with Proleptic Gregorian support, Spark 3.0 also provides new Date & Timestamp functions make_date(), make_timestamp(), make_interval().
make_date(year, month, day)
– This return Date from input arguments year, month, and the day fields.
make_date(2014, 8, 13)
// Returns 2014-08-13.
make_timestamp(year, month, day, hour, min, sec[, timezone])
– This return Timestamp from input arguments year, month, day, hour, min, sec, and timezone fields.
make_timestamp(2014, 8, 13, 1,10,40.147)
// Returns Timestamp 2014-08-13 1:10:40.147
make_timestamp(2014, 8, 13, 1,10,40.147,CET)
make_interval(years, months, weeks, days, hours, mins, secs)
– Make interval from years, months, weeks, days, hours, mins and secs.
Sending invalid dates to the make_date()
and make_timestam()
return null.
10. DataFrame.tail() feature added
Spark has a head()
action that returns the elements from the start or top of the Dataframe, but it has no tail()
action, Pandas in Python already supports tail() but it’s lacking in Spark. Spark 3.0 introduced the tail() action on DataFrame which returns the specified elements from the tail of the Dataframe. This tail() action operation returns scala.Array[T]
in scala.
val data=spark.range(1,100).toDF("num").tail(5)
data.foreach(print)
// Returns
// [95][96][97][98][99]
11. Added REPARTITION to SQL queries
Spark SQL queries were lacking some of the operations that are present in Dataset/DataFrame, for example, repartition() supports in the dataset were missing in Spark SQL. Spark 3.0 introduces to repartition in Spark SQL expression. Repartition transformation is used to increase or decrease the partitions.
val df=spark.range(1,10000).toDF("num")
println("Before re-partition :"+df.rdd.getNumPartitions)
df.createOrReplaceTempView("RANGE_TABLE")
val df2=spark.sql("SELECT /*+ REPARTITION(20) */ * FROM RANGE_TABLE")
println("After re-partition :"+df2.rdd.getNumPartitions)
// Returns
// Before re-partition :1
// After re-partition :20
12. Better ANSI SQL compatible
Since Spark is being used by many Data engineers who might already familiar with ANSI SQL, Spark 3.0 enhanced to better compatibility with ANSI SQL. You can enable this by setting true
to spark.sql.parser.ansi.enabled
Spark config
13. More Spark 3.0 Features
Here we have covered a fraction of Spark 3.0 features, Spark community has resolved more than 3400 tickets in this release. In my future articles, I will cover some of these. stay tuned !!
Related Articles
- Spark Internal Execution plan
- Spark Types of Tables and Views
- Spark Drop, Delete, Truncate Differences
- Spark Convert a Row into Case Class
- Spark Convert a Row into Case Class
- Spark – Extract DataFrame Column as List
- Spark Partitioning & Partition Understanding
This site is really awesome for beginners. More examples and concepts are explained clearly. One of my first reference point if I get into any issues. Thanks for doing this.