Spark with Scala provides several built-in SQL standard array functions, also known as collection functions in DataFrame API. These come in handy when we need to perform operations on an array (ArrayType) column. All these array functions accept input as an array column and several other arguments based on the function.

Advertisements

Why use SQL Arry Functions?

When possible, try to leverage the Spark SQL standard library functions, as they are slightly more compile-time safe, handle null, and perform better than UDFs. If your application is critical of performance, try to avoid using custom UDFs at all costs and use these standard functions.

These Spark SQL array functions are grouped as collection functions “collection_funcs” in Spark SQL along with several map functions. They come in handy when we want to perform operations and transformations on array columns.

SQL Array Functions in Spark

Following are some of the most used array functions available in Spark SQL. These functions enable various operations on arrays within Spark SQL DataFrame columns, facilitating array manipulation and analysis.

Note: Some of the following functions are supported in Apache Spark 3.5.0; if any function doesn’t work for you, you are probably using a lower Spark version.

Function NameSQL Array Functions Description
array()Creates a new array from the given input columns.
array_contains()Returns true if the array contains the given value.
array_append()Appends the element to the source array and returns an array containing all elements.
The new element/column is added at the end of the array.
array_insert()Returns an array after adding the element at the specified position.
arrays_overlap()Returns true if a1 and a2 have at least one non-null element in common.
array_distinct()Removes duplicate values from the array.
array_except()Computes the set difference of the two arrays.
array_intersect()Computes the set intersection of the two arrays.
array_join()Concatenates the elements of the given array using a delimiter.
array_max()Returns the maximum value in the array.
array_min()Returns the minimum value in the array.
array_size()Returns the size of an array.
array_position()Returns the (1-based) index of the first occurrence of the given value in the array.
array_remove()Remove all null elements from the input array.
array_compact()Remove all null elements form the input array.
array_prepend()Returns an array after appending the array at the beginning.
array_repeat()Repeat an array n times.
array_sort()Sorts the input array in ascending order according to the natural ordering of the array elements.
array_union()Computes the union of the two arrays.
element_at()Returns element of the array at the given index.
slice()Returns sliced array.
Spark SQL Array Functions

Spark SQL Array Function Scala Examples

Before we use these functions, let’s create a DataFrame with a few array columns. I will use this DataFrame for all my examples below.


// Import
import org.apache.spark.sql.SparkSession
  
// Create SparkSession
val spark:SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

// Create dataframe
val data = Seq(
    ("James,,Smith",List("Java","Scala","C++"),List("Python","PHP"),"CA"),
    ("Michael,Rose,",List("Spark","Java","C++"),List("AWS","Scala","Scala"),"NJ"),
    ("Robert,,Williams",List("AWS","Scala"),null,"NV")
  )
val columns = Seq("name","languages_school","additional_language","state")
var df = spark.createDataFrame(data).toDF(columns:_*)
df.show(false)

Yields below output. Note that here, columns languages_school and additional_language are array type


// Output:
+----------------+------------------+-------------------+-----+
|name            |languages_school  |additional_language|state|
+----------------+------------------+-------------------+-----+
|James,,Smith    |[Java, Scala, C++]|[Python, PHP]      |CA   |
|Michael,Rose,   |[Spark, Java, C++]|[AWS, Scala, Scala]|NJ   |
|Robert,,Williams|[AWS, Scala]      |NULL               |NV   |
+----------------+------------------+-------------------+-----+

array_contains()

Function array_contains() in Spark returns true if the array contains the specified value. Returns null value if the array itself is null; otherwise, it returns false. This is primarily used to filter rows from the DataFrame.

Syntax


// Syntax
array_contains(column: Column, value: Any): Column

The following example returns the DataFrame df3 by including only rows where the list column “languages_school” contains the value “Java”.


// Import
import org.apache.spark.sql.functions.array_contains

// Using array_contains
val df3=df.where(array_contains(col("languages_school"),"Java"))
df3.show(false)

// Output:
//+-------------+------------------+-------------------+-----+
//|name         |languages_school  |additional_language|state|
//+-------------+------------------+-------------------+-----+
//|James,,Smith |[Java, Scala, C++]|[Python, PHP]      |CA   |
//|Michael,Rose,|[Spark, Java, C++]|[AWS, Scala, Scala]|NJ   |
//+-------------+------------------+-------------------+-----+

array_sort()

array_sort() function arranges the input array in ascending order. The elements within the array must be sortable. When you have NaN values in an array, the following applies.

  • For double/float type, NaN is considered greater than any non-NaN elements.
  • Null elements are positioned at the end of the resulting array.

Syntax


// Syntax
array_sort(e: Column): Column
array_sort(e: Column, comparator: (Column, Column) => Column): Column

Example


// Import
import org.apache.spark.sql.functions.array_sort

// Using array_sort
val df2 = df.withColumn("array_sort",
    array_sort(col("languages_school")) )
df2.show(false)

// Output:
//+----------------+------------------+-------------------+-----+------------------+
//|name            |languages_school  |additional_language|state|array_sort        |
//+----------------+------------------+-------------------+-----+------------------+
//|James,,Smith    |[Java, Scala, C++]|[Python, PHP]      |CA   |[C++, Java, Scala]|
//|Michael,Rose,   |[Spark, Java, C++]|[AWS, Scala, Scala]|NJ   |[C++, Java, Spark]|
//|Robert,,Williams|[AWS, Scala]      |NULL               |NV   |[AWS, Scala]      |
//+----------------+------------------+-------------------+-----+------------------+

From the code val df2 = df.withColumn("array_sort", array_sort(col("languages_school"))) creates a new DataFrame df2 based on the DataFrame df, where a new column named “array_sort” is added. This new column contains the original values of the “languages_school” column sorted in ascending order within arrays.

array_join()

This function combines all elements of the list/array column using the delimiter. When the nullReplacement parameter is used, the array containing null values is replaced with ‘nullReplacement’.

Syntax


// Syntax
array_join(column: Column, delimiter: String): Column
array_join(column: Column, delimiter: String, nullReplacement: String): Column 

Example


// Import
import org.apache.spark.sql.functions.array_join

// Using array_join
val df4 = df.withColumn("array_join",
    array_join(col("languages_school"),"|") )
df4.show(false)

// Output:
//+----------------+------------------+-------------------+-----+--------------+
//|name            |languages_school  |additional_language|state|array_join    |
//+----------------+------------------+-------------------+-----+--------------+
//|James,,Smith    |[Java, Scala, C++]|[Python, PHP]      |CA   |Java|Scala|C++|
//|Michael,Rose,   |[Spark, Java, C++]|[AWS, Scala, Scala]|NJ   |Spark|Java|C++|
//|Robert,,Williams|[AWS, Scala]      |NULL               |NV   |AWS|Scala     |
//+----------------+------------------+-------------------+-----+--------------+

This example creates a new DataFrame df4 based on the DataFrame df. In this new DataFrame, a new column named “array_join” is added. This column contains the values of the “languages_school” column joined together into a single string, with each value separated by the “|” delimiter character.

array_append()

array_append() function returns an array that includes all elements from the original array along with the new element. The new element or column is positioned at the end of the array.

Syntax


// Syntax
array_append(column: Column, element: Any): Column

Example


// Import
import org.apache.spark.sql.functions.array_append

// Using array_append
val df5 = df.withColumn("array_append",
    array_append(col("languages_school"),"PHP") )
df5.show(false)

// Output:
//+----------------+------------------+-------------------+-----+-----------------------+
//|name            |languages_school  |additional_language|state|array_append           |
//+----------------+------------------+-------------------+-----+-----------------------+
//|James,,Smith    |[Java, Scala, C++]|[Python, PHP]      |CA   |[Java, Scala, C++, PHP]|
//|Michael,Rose,   |[Spark, Java, C++]|[AWS, Scala, Scala]|NJ   |[Spark, Java, C++, PHP]|
//|Robert,,Williams|[AWS, Scala]      |NULL               |NV   |[AWS, Scala, PHP]      |
//+----------------+------------------+-------------------+-----+-----------------------+

it returns a new DataFrameby adding a new column named “array_append”. This column contains arrays that include all the elements from the original “languages_school” column along with the additional element “PHP” appended to the end of each array.

array_union()

Similarly, the array_union function combines the elements from both columns, removing duplicates, and returns an array that contains all unique elements from both input arrays. If there are any null arrays or columns, they are ignored in the union operation.

Syntax


// Syntax
array_union(col1: Column, col2: Column): Column

Example


// Import
import org.apache.spark.sql.functions.array_union

// Using array_union
val df6 = df.withColumn("array_union",
    array_union(col("languages_school"),col("additional_language")) )
  df6.show(false)

// Output:
//+----------------+------------------+-------------------+-----+-------------------------------+
//|name            |languages_school  |additional_language|state|array_union                    |
//+----------------+------------------+-------------------+-----+-------------------------------+
//|James,,Smith    |[Java, Scala, C++]|[Python, PHP]      |CA   |[Java, Scala, C++, Python, PHP]|
//|Michael,Rose,   |[Spark, Java, C++]|[AWS, Scala, Scala]|NJ   |[Spark, Java, C++, AWS, Scala] |
//|Robert,,Williams|[AWS, Scala]      |NULL               |NV   |NULL                           |
//+----------------+------------------+-------------------+-----+-------------------------------+

In this new DataFrame, a new column named “array_union” is added. This column contains arrays that represent the union of elements from the “languages_school” column and the “additional_language” column.

array_size()

The array_size() returns the total number of elements in the array column. If your input array column is null, it returns null.

Syntax


// Syntax
array_size(e: Column): Column

Example


// Import
import org.apache.spark.sql.functions.array_size

// Using array_size
val df7 = df.withColumn("array_size", array_size(col("languages_school")))

// Output:
//+----------------+------------------+-------------------+-----+----------+
//|name            |languages_school  |additional_language|state|array_size|
//+----------------+------------------+-------------------+-----+----------+
//|James,,Smith    |[Java, Scala, C++]|[Python, PHP]      |CA   |3         |
//|Michael,Rose,   |[Spark, Java, C++]|[AWS, Scala, Scala]|NJ   |3         |
//|Robert,,Williams|[AWS, Scala]      |NULL               |NV   |2         |
//+----------------+------------------+-------------------+-----+----------+

This returns a new DataFrame with a column containing the array size of the column languages_school

element_at()

element_at() returns an element of the array at the specified index.

Syntax


// Syntax
element_at(column: Column, value: Any): Column 

Example


// Import
import org.apache.spark.sql.functions.element_at

// Using array_size
val df7 = df.withColumn("element_at", element_at(col("languages_school"),2))

// Output:
//+----------------+------------------+-------------------+-----+----------+--------------+
//|name            |languages_school  |additional_language|state|element_at|array_position|
//+----------------+------------------+-------------------+-----+----------+--------------+
//|James,,Smith    |[Java, Scala, C++]|[Python, PHP]      |CA   |Scala     |2             |
//|Michael,Rose,   |[Spark, Java, C++]|[AWS, Scala, Scala]|NJ   |Java      |0             |
//|Robert,,Williams|[AWS, Scala]      |NULL               |NV   |Scala     |2             |
//+----------------+------------------+-------------------+-----+----------+--------------+

array_position()

Use array_position() to find the position of the first occurrence of the value in the given array. It returns null if either of the arguments is null.

Note that the position is not zero-based but 1 1-based index. Returns 0 if the value could not be found in the array.

Syntax


// Syntax
array_position(column: Column, value: Any): Column

Example


// Import
import org.apache.spark.sql.functions.array_position

// Using array_size
val df7 = df.withColumn("array_position", 
       array_position(col("languages_school"),"Scala"))

// Output:
//+----------------+------------------+-------------------+-----+--------------+
//|name            |languages_school  |additional_language|state|array_position|
//+----------------+------------------+-------------------+-----+--------------+
//|James,,Smith    |[Java, Scala, C++]|[Python, PHP]      |CA   |2             |
//|Michael,Rose,   |[Spark, Java, C++]|[AWS, Scala, Scala]|NJ   |0             |
//|Robert,,Williams|[AWS, Scala]      |NULL               |NV   |2             |
//+----------------+------------------+-------------------+-----+--------------+

array_insert()

In Spark, array_insert() is a function used to insert elements into an array at the specified index. You can use array_insert() in various scenarios where you need to modify arrays dynamically.

Syntax


// Syntax
array_insert(arr: Column, pos: Column, value: Column): Column

arrays_overlap()

arrays_overlap() It evaluates to true when there’s at least one non-null element common on both arrays. If both arrays are non-empty but any of them contains a null, it yields null. Otherwise, it returns false.

Syntax


// Syntax
arrays_overlap(a1: Column, a2: Column): Column

array_distinct()

In Spark, the array_distinct() function is used to return an array with distinct elements from the input array. It removes duplicate elements and returns only unique elements in the resulting array.

Syntax


// Syntax
array_distinct(e: Column): Column

The function returns a new array containing only distinct elements from the input array, preserving the original order of elements.

array_intersect()

You can use array_intersect() when you need to find the common elements between arrays in your Spark SQL queries. It’s particularly useful when you want to perform set operations on array data, such as finding intersections.

Syntax


// Syntax
array_intersect(col1: Column, col2: Column): Column

array_remove()

array_remove() is a function used to remove all occurrences of a specified value from an array. It returns a new array with the specified value removed from all occurrences within the input array.

Syntax


// Syntax
array_remove(column: Column, element: Any): Column

You can use array_remove() when you need to eliminate specific elements from arrays in your Spark SQL queries. It’s particularly useful when you want to filter out certain values from array data.

array_compact()

Remove all null elements from the given array.

Syntax


// Syntax
array_compact(column: Column): Column

array_prepend()

It was introduced in Spark 3.5.0. The array_append() prepends the specified element at the beginning of the array column. This returns an array containing value as well as all elements from an array.

Syntax


// Syntax
array_prepend(column: Column, element: Any): Column

array_repeat()

In Spark, array_repeat() is a function used to generate an array by repeating a specified value or set of values a specified number of times. array_repeat() is useful when you need to generate arrays with repeated values or patterns in your Spark SQL queries.

Syntax


// Syntax
array_repeat(left: Column, right: Column): Column

Conclusion

In this article, you have learned the benefits of using array functions over UDF functions and how to use some common array functions available in Spark SQL using Scala. In Apache Spark SQL, array functions are used to manipulate and operate on arrays within DataFrame columns. Refer to the official Apache Spark documentation for each function’s complete list and detailed descriptions.