PySpark Pandas UDF (pandas_udf) Example

  • Post author:
  • Post category:PySpark
  • Post last modified:March 2, 2021

By using pyspark.sql.functions.pandas_udf() function you can create a Pandas UDF (User Defined Function) that is executed by PySpark with Arrow to transform the DataFrame. PySpark by default provides hundreds of built-in function hence before you create your own function, I would recommend doing little research to identify if the function you are creating is already available in pyspark.sql.functions.

Related: Create PySpark UDF Function

In this article, I will explain pandas_udf() function, its syntax, and how to use it with examples.

1. Syntax of pandas_udf()

Following is the syntax of the pandas_udf() function


# Syntax
pandas_udf(f=None, returnType=None, functionType=None)
  • f – User defined function
  • returnType – This is optional but when specified it should be either a DDL-formatted type string or any type of pyspark.sql.types.DataType
  • functionType – int, optional

2. PySpark pandas_udf() Usage with Examples

The pandas_udf() is a built-in function from pyspark.sql.functions that is used to create the Pandas user-defined function and apply the custom function to a column or to the entire DataFrame.

Note that at the time of writing this article, this function doesn’t support returning values of type pyspark.sql.types.ArrayType of pyspark.sql.types.TimestampType and nested pyspark.sql.types.StructType.

Following are the steps to create PySpark Pandas UDF and use it on DataFrame.

2.1 Create the DataFrame

First, let’s create the PySpark DataFrame, I will apply the pandas UDF on this DataFrame.


# Import
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com') \
                    .getOrCreate()
# Prepare Data
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

# Create DataFrame
df = spark.createDataFrame(data=data,schema=columns)
df.show()

Yields below output.

pyspark pandas_udf()

2.2 Import Related to Pandas UDF Function

You would need the following imports to use pandas_udf() function.


# Imports
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
import pandas as pd

2.3 Create PySpark Pandas UDF

By using pandas_udf() let’s create the custom UDF function. The following example can be used in Spark 3.0 or later versions.


# Create pandas_udf()
@pandas_udf(StringType())
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper()

If you using an earlier version of Spark 3.0 use the below function. In previous versions, the pandas UDF used functionType to decide the execution type as below:



# Create pandas_udf()
from pyspark.sql.functions import PandasUDFType
@pandas_udf(StringType(), PandasUDFType.SCALAR)
def to_upper(s):
    return s.str.upper()

2.4 Use Pandas UDF on select() & withColumn()

Finally, let’s use the above defined Pandas UDF function to_upper() on PySpark select() and withColumn() functions.


# Using UDF with select()
df.select("Seqno","Name",to_upper("Name")).show()

# Using UDF with withColumn()
df.withColumn("upper_col",to_upper("Name")).show()

Yields below output.

pyspark pandas udf

3. PySpark with Pandas apply() Function

In case you wanted to just apply some custom function to the DataFrame, you can also use the below approach. Note that this approach doesn’t use pandas_udf() function.


# Imports
import pyspark.pandas as ps
import numpy as np

# Prepare Data
technologies = ({
    'Fee' :[20000,25000,30000,22000,np.NaN],
    'Discount':[1000,2500,1500,1200,3000]
               })

# Create a DataFrame
psdf = ps.DataFrame(technologies)
print(psdf)

# Create Python function
def add(data):
   return data[0]+data[1]

# Apply the function to DataFrame   
addDF = psdf.apply(add,axis=1)
print(addDF)

4. Complete Example

Following is a complete example of pandas_udf() Function


# Import
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com') \
                    .getOrCreate()
# Prepare Data
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

# Create DataFrame
df = spark.createDataFrame(data=data,schema=columns)
df.show()

# imports
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
import pandas as pd

# create pandas_udf
@pandas_udf(StringType())
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper()

# Using UDF with select()
df.select("Seqno","Name",to_upper("Name")).show()

# Using UDF with withColumn()
df.withColumn("upper_col",to_upper("Name")).show()

5. Conclusion

In this article, you have learned what is Python pandas_udf(), its Syntax, how to create one and finally use it on select() and withColumn() functions. Also learned how to create a simple custom function and use it on DataFrame.

Related Articles

Leave a Reply