Spark 3.0 Features with Examples – Part I

Spark 3.0 released with a list of new features that includes performance improvement using ADQ, reading Binary files, improved support for SQL and Python, Python 3.0, Hadoop 3 compatibility, ACID support to name a few.

In this article, I will try to cover a few features along with Spark examples where possible. This is my first article on Spark 3.0 features and will be publishing more in this series.

Spark 3.0 Features list
Spark 3.0 Features List – Part 1

1. Spark 3.0 Features list covered in this article.

2. Adaptive Query Execution

Adaptive Query Execution (AQE) is one of the greatest features of Spark 3.0 which reoptimizes and adjusts query plans based on runtime statistics collected during the execution of the query.

Prior to 3.0, Spark does the optimization by creating an execution plan before the query starts executing, once execution starts Spark doesn’t do any further optimization which is based on metrics it collects during runtime. AQE bridges this gap by applying the second level of optimization based on the metrics it sees with each stage.

Adaptive Query Execution is disabled by default. In order to enable Set spark.sql.adaptive.enabled configuration property to true.

After enabling Adaptive Query Execution, Spark performs 2x improvement on TPC-DS over Spark 2.4 

Spark 3.0 comes with three major features in AQE.

  • Coalescing Post-shuffle Partitions that dynamically determine the optimal number of partitions
  • Converting sort-merge join to Broadcast join, and
  • Skew Join Optimization

Adaptive Query execution needs it’s own topic, hance I’ve created another article explaining AQE and it’s features in detail.

3. Languages Version Upgrades

Spark 3.0 released with upgrades to languages it supports. Now Spark supports following language versions.

  • Python3 (Python 2.x)
  • Scala 2.12
  • JDK 11

Besides the language it has upgraded Hadoop to version 3 and Kafka to 2.4.1 and many more.

4. New UI for Structured Streaming

Spark 3.0 provides a new Structured Streaming tab to Spark Web UI in order to monitor Structured streaming applications. This tab provides the run ID, Status, Start Time, Duration for each micro-batch along with runtime statistics. This helps the developer to debug and understand what’s happening with the streaming queries. It has two sections.

  • Active Streaming Queries
  • Completed Streaming Queries
Spark 3.0 features
Source: Databricks

Active Streaming queries list all RUNNING queries and Completed Streaming queries lists all FAILED and FINISHED queries.

Select Run ID which provides the detailed statistics of Streaming queries such as Input Rate, Process Rate, Input Rows, Batch Duration, and Operation Duration. You can find examples on how to troubleshoot the Structured Streaming at Databricks.

5. Datasource to Read Binary Files

Spark 3.0 supports a data source format “binaryFile” to read Binary files, Using binaryFile data source, DataFrameReader reads files like image, pdf, zip, gzip, tar, and many binary files into DataFrame, each file will be read as a single record into Dataframe along with the metadata of the file.


val df = spark.read.format("binaryFile").load("/tmp/binary/spark.png")
df.printSchema()
df.show()

Yields below output.


// Output:
root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)

+--------------------+--------------------+------+--------------------+
|                path|    modificationTime|length|             content|
+--------------------+--------------------+------+--------------------+
|file:/C:/tmp/bina...|2020-07-25 10:11:...| 74675|[89 50 4E 47 0D 0...|
+--------------------+--------------------+------+--------------------+

6. Feature to Read Recursive folders

Spark 3.0 added a feature recursiveFileLookup option to read or load files from recursive subfolders, by setting the value to true to this option, DataFrameReader recursively loads files by looping through all folders and subfolders from a specified path.


// Feature to Read Recursive folders
spark.read.option("recursiveFileLookup", "true").csv("/path/to/folder")

7. Multiple Character Delimiter Support

Now with 3.0, Spark supports multiple character delimiter (||) while reading and writing CSV files. For example, if you have a CSV file with the following content.


// Multiple Character Delimiter Support 
col1||col2||col3||col4
val1||val2||val3||val4
val1||val2||val3||val4

This can be read using the following example.


val df  = spark.read
      .option("delimiter","||")
      .option("header","true")
      .csv("/tmp/data/douplepipedata.csv")

Spark 2.x, doesn’t support multiple character delimiter hence, we used to convert from double delimiter to single delimiter using a pre-processor. Trying to read directly without pre-processing results in the following error.


throws java.lang.IllegalArgumentException: Delimiter cannot be more than one character: ||

8. New Spark Built-in Functions

Spark SQL already has hundreds of built-in functions, Spark 3.0 adds additional functions to the growing list. soon, I will write a separate article explaining all these functions.


// New Spark Built-in Functions
sinh,cosh,tanh,asinh,acosh,atanh,any,bit_and,bit_or,bit_count,bit_xor,
bool_and,bool_or,count_if,date_part,extract,forall,from_csv,
make_date,make_interval,make_timestamp,map_entries
map_filter,map_zip_with,max_by,min_by,schema_of_csv,to_csv
transform_keys,transform_values,typeof,version
xxhash64

9. Switch to Proleptic Gregorian calendar 

Earlier version of Spark supports Dates in Julian and Gregorian calendar: For dates before 1582, the Julian calendar was used, for dates, after 1582 the Gregorian calendar was used. This is similar to Dates in JDK 7 and before which uses java.sql.Date API. From JDK 8, a new Proleptic Gregorian calendar has been introduced with java.time.LocalDate API.

Spark 3.0 adopts the Proleptic Gregorian calendar, which is already being used by other data systems like pandas, R, and Apache Arrow. All Date & Timestamp functions Spark provides prior to 3.0 behaves the same ways if your dates are after October 15, 1582. and your results might be different for dates prior to October 15, 1582.

Along with Proleptic Gregorian support, Spark 3.0 also provides new Date & Timestamp functions make_date(), make_timestamp(), make_interval().

make_date(year, month, day) – This return Date from input arguments year, month, and the day fields.


make_date(2014, 8, 13)
// Returns 2014-08-13.

make_timestamp(year, month, day, hour, min, sec[, timezone]) – This return Timestamp from input arguments year, month, day, hour, min, sec, and timezone fields.


make_timestamp(2014, 8, 13, 1,10,40.147)
// Returns Timestamp 2014-08-13 1:10:40.147
make_timestamp(2014, 8, 13, 1,10,40.147,CET)

make_interval(years, months, weeks, days, hours, mins, secs) – Make interval from years, months, weeks, days, hours, mins and secs.

Sending invalid dates to the make_date() and make_timestam() return null.

10. DataFrame.tail() feature added

Spark has a head() action that returns the elements from the start or top of the Dataframe, but it has no tail() action, Pandas in Python already supports tail() but it’s lacking in Spark. Spark 3.0 introduced the tail() action on DataFrame which returns the specified elements from the tail of the Dataframe. This tail() action operation returns scala.Array[T] in scala.


val data=spark.range(1,100).toDF("num").tail(5)
data.foreach(print)
// Returns
// [95][96][97][98][99]

11. Added REPARTITION to SQL queries

Spark SQL queries were lacking some of the operations that are present in Dataset/DataFrame, for example, repartition() supports in the dataset were missing in Spark SQL. Spark 3.0 introduces to repartition in Spark SQL expression. Repartition transformation is used to increase or decrease the partitions.


val df=spark.range(1,10000).toDF("num")
println("Before re-partition :"+df.rdd.getNumPartitions)
df.createOrReplaceTempView("RANGE_TABLE")
val df2=spark.sql("SELECT /*+ REPARTITION(20) */ * FROM RANGE_TABLE")
println("After re-partition :"+df2.rdd.getNumPartitions)
// Returns 
// Before re-partition :1
// After re-partition :20

12. Better ANSI SQL compatible

Since Spark is being used by many Data engineers who might already familiar with ANSI SQL, Spark 3.0 enhanced to better compatibility with ANSI SQL. You can enable this by setting true to spark.sql.parser.ansi.enabled Spark config

13. More Spark 3.0 Features

Here we have covered a fraction of Spark 3.0 features, Spark community has resolved more than 3400 tickets in this release. In my future articles, I will cover some of these. stay tuned !!

Reference

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply

This Post Has One Comment

  1. Dharmeswaran

    This site is really awesome for beginners. More examples and concepts are explained clearly. One of my first reference point if I get into any issues. Thanks for doing this.