Spark SQL UDF (User Defined Functions)

Spark SQL UDF (a.k.a User Defined Function) is the most useful feature of Spark SQL & DataFrame which extends the Spark build in capabilities. In this article, I will explain what is UDF? why do we need it and how to create and using it on DataFrame and SQL using Scala example.

Note: UDF’s are the most expensive operations hence use them only you have no choice and when essential.

What is Spark UDF?

UDF a.k.a User Defined Function, If you are coming from SQL background, UDF’s are nothing new to you as most of the traditional RDBMS databases support User Defined Functions, and Spark UDF’s are similar to these.

In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.

Why do we need a Spark UDF?

UDF’s are used to extend the functions of the framework and re-use this function on several DataFrame. For example if you wanted to convert the every first letter of a word in a sentence to capital case, spark build-in features does’t have this function hence you can create it as UDF and reuse this as needed on many Data Frames. UDF’s are once created they can be re-use on several DataFrame’s and SQL expressions.

Before you create any UDF, do your research to check if the similar function you wanted is already available in Spark SQL Functions. Spark SQL provides several predefined common functions and many more new functions are added with every release. hence, It is best to check before you reinventing the wheel.

When you creating UDF’s you need to design them very carefully otherwise you will come across performance issues.

Create a DataFrame

Before we jump in creating a UDF, first let’s create a Spark DataFrame.


import spark.implicits._
val columns = Seq("Seqno","Quote")
val data = Seq(("1", "Be the change that you wish to see in the world"),
    ("2", "Everyone thinks of changing the world, but no one thinks of changing himself."),
    ("3", "The purpose of our lives is to be happy.")
  )
val df = data.toDF(columns:_*)
df.show(false)

Yields below output.


+-----+-----------------------------------------------------------------------------+
|Seqno|Quote                                                                        |
+-----+-----------------------------------------------------------------------------+
|1    |Be the change that you wish to see in the world                              |
|2    |Everyone thinks of changing the world, but no one thinks of changing himself.|
|3    |The purpose of our lives is to be happy.                                     |
+-----+-----------------------------------------------------------------------------+

Create a Function

The first step in creating a UDF is creating a Scala function. Below snippet creates a function convertCase() which takes a string parameter and converts the first letter of every word to capital letter. UDF’s take parameters of your choice and returns a value.


val convertCase =  (strQuote:String) => {
    val arr = strQuote.split(" ")
    arr.map(f=>  f.substring(0,1).toUpperCase + f.substring(1,f.length)).mkString(" ")
}

Create Spark UDF to use it on DataFrame

Now convert this function convertCase() to UDF by passing the function to Spark SQL udf(), this function is available at org.apache.spark.sql.functions.udf package. Make sure you import this package before using it.


val convertUDF = udf(convertCase)

Now you can use convertUDF() on a DataFrame column. udf() function return org.apache.spark.sql.expressions.UserDefinedFunction.


//Using with DataFrame
df.select(col("Seqno"), 
    convertUDF(col("Quote")).as("Quote") ).show(false)

This results below output.


+-----+-----------------------------------------------------------------------------+
|Seqno|Quote                                                                        |
+-----+-----------------------------------------------------------------------------+
|1    |Be The Change That You Wish To See In The World                              |
|2    |Everyone Thinks Of Changing The World, But No One Thinks Of Changing Himself.|
|3    |The Purpose Of Our Lives Is To Be Happy.                                     |
+-----+-----------------------------------------------------------------------------+

Registering Spark UDF to use it on SQL

In order to use convertCase() function on Spark SQL, you need to register the function with Spark using spark.udf.register().


// Using it on SQL
spark.udf.register("convertUDF", convertCase)
df.createOrReplaceTempView("QUOTE_TABLE")
spark.sql("select Seqno, convertUDF(Quote) from QUOTE_TABLE").show(false)

This yields the same output as previous example.

null check

UDF’s are error-prone when not designed carefully. for example, when you have a column that contains the value null on some records and not handling null inside a UDF function returns below error.


Exception in thread "main" org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (string) => string)
	at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1066)
	at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:152)
	at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:92)
	at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24$$anonfun$applyOrElse$23.apply(Optimizer.scala:1364)
	at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24$$anonfun$applyOrElse$23.apply(Optimizer.scala:1364)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24.applyOrElse(Optimizer.scala:1364)
	at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24.applyOrElse(Optimizer.scala:1359)

It always best practice to check for null inside a UDF function rather than checking for null outside.

Performance concern using UDF

UDF’s are a black box to Spark hence it can’t apply optimization and you will lose all the optimization Spark does on Dataframe/Dataset. When possible you should use Spark SQL built-in functions as these functions provide optimization.

Complete UDF Example

Below is complete UDF function example in Scala


import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Row, SparkSession}

object SparkUDF extends App{

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  import spark.implicits._
  val columns = Seq("Seqno","Quote")
  val data = Seq(("1", "Be the change that you wish to see in the world"),
    ("2", "Everyone thinks of changing the world, but no one thinks of changing himself."),
    ("3", "The purpose of our lives is to be happy.")

  )
  val df = data.toDF(columns:_*)
  df.show(false)

  val convertCase =  (str:String) => {
    val arr = str.split(" ")
    arr.map(f=>  f.substring(0,1).toUpperCase + f.substring(1,f.length)).mkString(" ")
  }

  //Using with DataFrame
  val convertUDF = udf(convertCase)
  df.select(col("Seqno"),
    convertUDF(col("Quote")).as("Quote") ).show(false)

  // Using it on SQL
  spark.udf.register("convertUDF", convertCase)
  df.createOrReplaceTempView("QUOTE_TABLE")
  spark.sql("select Seqno, convertUDF(Quote) from QUOTE_TABLE").show(false)
  
}

This example is also available at Spark GitHub project for reference.

Conclusion

In this article, you have learned Spark UDF is a User Defined Function that is used to create a reusable function that can be used on multiple DataFrame. Once UDF’s are created they can be used on DataFrame and SQL (after registering) .

Reference

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply