PySpark map() Transformation

  • Post author:
  • Post category:PySpark
  • Post last modified:January 18, 2024
  • Reading time:12 mins read

PySpark map (map()) is an RDD transformation that is used to apply the transformation function (lambda) on every element of RDD/DataFrame and returns a new RDD. In this article, you will learn the syntax and usage of the RDD map() transformation with an example and how to use it with DataFrame.

RDD map() transformation is used to apply any complex operations like adding a column, updating a column, or transforming the data, etc; the output of map transformations would always have the same number of records as the input.

  • Note1: DataFrame doesn’t have map() transformation to use with DataFrame; hence, you need to convert DataFrame to RDD first.
  • Note 2: If you have a heavy initialization, use PySpark mapPartitions() transformation instead of map(); as with mapPartitions(), heavy initialization executes only once for each partition instead of every record.

Related: Spark map() vs mapPartitions() Explained with Examples

First, let’s create an RDD from the list.


# Imports
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
    .appName("SparkByExamples.com").getOrCreate()

data = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]

rdd=spark.sparkContext.parallelize(data)

map() Syntax


# Syntax
map(f, preservesPartitioning=False)

PySpark map() Example with RDD

In this PySpark map() example, we are adding a new element with value 1 for each element, the result of the RDD is PairRDDFunctions which contains key-value pairs, word of type String as Key and 1 of type Int as value.


# map() with rdd
rdd2=rdd.map(lambda x: (x,1))
for element in rdd2.collect():
    print(element)

This yields below output.

pyspark rdd map transformation

PySpark map() Example with DataFrame

PySpark DataFrame doesn’t have map() transformation to apply the lambda function, when you wanted to apply the custom transformation, you need to convert the DataFrame to RDD and apply the map() transformation. Let’s use another dataset to explain this.


data = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|    30|
|     Anna|    Rose|     F|    41|
|   Robert|Williams|     M|    62|
+---------+--------+------+------+

# Refering columns by index.
rdd2=df.rdd.map(lambda x: 
    (x[0]+","+x[1],x[2],x[3]*2)
    )  
df2=rdd2.toDF(["name","gender","new_salary"]   )
df2.show()
+---------------+------+----------+
|           name|gender|new_salary|
+---------------+------+----------+
|    James,Smith|     M|        60|
|      Anna,Rose|     F|        82|
|Robert,Williams|     M|       124|
+---------------+------+----------+

Note that aboveI have used index to get the column values, alternatively, you can also refer to the DataFrame column names while iterating.


# Referring Column Names
rdd2=df.rdd.map(lambda x: 
    (x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2)
    ) 

Another alternative


# Referring Column Names
rdd2=df.rdd.map(lambda x: 
    (x.firstname+","+x.lastname,x.gender,x.salary*2)
    ) 

You can also create a custom function to perform an operation. Below func1() function executes for every DataFrame row from the lambda function.


# By Calling function
def func1(x):
    firstName=x.firstname
    lastName=x.lastname
    name=firstName+","+lastName
    gender=x.gender.lower()
    salary=x.salary*2
    return (name,gender,salary)

rdd2=df.rdd.map(lambda x: func1(x))

Complete PySpark map() example

Below is a complete example of PySpark map() transformation.


from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = ["Project",
"Gutenberg’s",
"Alice’s",
"Adventures",
"in",
"Wonderland",
"Project",
"Gutenberg’s",
"Adventures",
"in",
"Wonderland",
"Project",
"Gutenberg’s"]

rdd=spark.sparkContext.parallelize(data)

rdd2=rdd.map(lambda x: (x,1))
for element in rdd2.collect():
    print(element)
    
data = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

rdd2=df.rdd.map(lambda x: 
    (x[0]+","+x[1],x[2],x[3]*2)
    )  
df2=rdd2.toDF(["name","gender","new_salary"]   )
df2.show()

#Referring Column Names
rdd2=df.rdd.map(lambda x: 
    (x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2)
    ) 

#Referring Column Names
rdd2=df.rdd.map(lambda x: 
    (x.firstname+","+x.lastname,x.gender,x.salary*2)
    ) 

def func1(x):
    firstName=x.firstname
    lastName=x.lastname
    name=firstName+","+lastName
    gender=x.gender.lower()
    salary=x.salary*2
    return (name,gender,salary)

rdd2=df.rdd.map(lambda x: func1(x))

Frequently Asked Questions on map()

How does the map() transformation differ from other transformations, like flatMap() in PySpark?

The map() transformation applies a function on each element of the RDD independently, resulting in a new RDD with the same number of elements. Meanwhile, flatMap() can produce a variable number of output elements for each input element.

Can we apply Python lambda functions with the map() transformation in PySpark?

We can use Python lambda functions or regular functions with the map() transformation.
For example:
rdd = sc.parallelize([2,4,6])
even_square = rdd.map(lambda x: x**2)

How does the map() transformation handle null or missing values?

The map() transformation in PySpark processes each element independently, and by default, it does not handle the null or missing values. We need to handle these cases within the mapping function explicitly.

How to use the map() transformation with key-value pairs in PySpark?

For key-value pairs, we need to use the map() transformation with a function that operates on the values while preserving the keys.

Conclusion

In conclusion, you have learned how to apply a map() transformation on every element of PySpark RDD and learned it returns the same number of elements as input RDD. This is one of the differences between map() vs flatMap() transformations. And you have also learned how to use map() on DataFrame by converting DataFrame to RDD.

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply