You are currently viewing Spark map() and mapValues()

Spark map() and mapValue() are two commonly used functions for transforming data in Spark RDDs (Resilient Distributed Datasets). In this article, I will explain these functions separately and then will describe the difference between map() and mapValues() functions and compare one with the other.

Advertisements

1. Spark Map()

In Spark, the map() function is used to transform each element of an RDD (Resilient Distributed Datasets) into another element. It returns a new RDD that contains the transformed elements.

The map() function takes a function as its argument, which defines how the transformation should be done. This function should take one input parameter of the type of the RDD elements and return the transformed output.

Here’s an example of using map() in Spark Scala:


//Imports
import org.apache.spark.sql.SparkSession

//Create Spark Session
val spark:SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()   

//Create sample RDD
val inputRDD = spark.sparkContext.parallelize(Seq(1, 2, 3, 4, 5))
inputRDD.collect

//Output
res0: Array[Int] = Array(1, 2, 3, 4, 5)

//Applying Mapping for each record in RDD.
val outputRDD = inputRDD.map(x => x * 2)

//Grad the result after mapping
outputRDD.collect

//Output
res1: Array[Int] = Array(2, 4, 6, 8, 10)

In this example, we first create an input RDD containing the integers 1 to 5. Then we apply the map() function to the input RDD to transform each element by multiplying it by 2. The resulting output RDD will contain the transformed elements [2, 4, 6, 8, 10].

Note that map() is a transformation operation, which means it is lazy and does not execute immediately. Instead, it builds a plan of transformations to be executed when an action operation is called, such as collect() or saveAsTextFile().

2. Explain Spark mapValues()

In Spark, mapValues() is a transformation operation on RDDs (Resilient Distributed Datasets) that transforms the values of a key-value pair RDD without changing the keys. It applies a specified function to the values of each key-value pair in the RDD, returning a new RDD with the same keys and the transformed values.

The mapValues() function takes a function as its argument, which is applied to each value in the RDD. This function should take one input parameter of the type of the RDD values and return the transformed output.

Here’s an example of using mapValues() in Spark.


//Imports
import org.apache.spark.sql.SparkSession

//Create Spark Session
val spark:SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()   

//Create an RDD with Key-Value Pair
val inputRDD = spark.sparkContext.parallelize(Seq(("apple", 3), ("banana", 2), ("orange", 4)))
inputRDD.collect

//Output of inputRDD collect
res0: Array[(String, Int)] = Array((apple,3), (banana,2), (orange,4))

//applying mapValue for each value of the key-pair
val outputRDD = inputRDD.mapValues(x => x * 2)
outputRDD.collect

//output
res1: Array[(String, Int)] = Array((apple,6), (banana,4), (orange,8))

In this example,

  • We first create an input RDD containing the key-value pairs of fruits and their corresponding quantities.
  • Then we apply the MapValues() function to the input RDD to transform the values of each key-value pair by multiplying them by 2.
  • The resulting output RDD will contain the same keys as the input RDD, but with the transformed values [("apple", 6), ("banana", 4), ("orange", 8)].

Note that mapValues() is a transformation operation, which means it is lazy and does not execute immediately. Instead, it builds a plan of transformations to be executed when an action operation is called, such as collect() or saveAsTextFile().

3. Compare spark map() vs mapValues() properties

Spark map() and mapValues() are both useful transformation operations in Spark, but they have different properties that make them better suited for different use cases.

Here are some properties to compare map() and mapValues() in Spark:

Propertymap()mapValues()
Input and output typesmap() takes a function that operates on each element of an RDD and returns a new RDD of potentially different type.mapValues() takes a function that operates only on the values of a key-value pair RDD and returns a new RDD of the same key type and potentially different value type.
Performancemap() is more flexible and can be used to transform RDDs in many waysmapValues() is generally faster than map() because it does not involve any serialization and deserialization of keys.Since mapValues() operates only on the values of a key-value pair RDD, it avoids shuffling the keys, which can be costly in terms of network and I/O overhead.
Combinabilitymap() can be used to transform an RDD in many ways. map() require additional operations to achieve the same effect of mapValues().mapValues() is specifically designed for transforming only the values of a key-value pair RDD. mapValues() can be easily combined with other key-value pair transformations such as reduceByKey() and groupByKey().
Ease of useWith map(), you need to explicitly map the keys along with the values if you want to preserve them.mapValues() is easier to use than map() when working with key-value pair RDDs because it automatically preserves the original keys of the RDD.
Output orderingmap() preserves the order of the elements in the input RDD.If the order of the values is important, then map() should be used.mapValues() may change the order of the values because it does not change the keys.
Spark map() vs mapValues()

4. Example of Spark map() vs mapValues()

Both Spark map() and mapValues() are transformation operations in Spark that allow you to apply a function to each element of an RDD. However, they differ in how they operate on key-value pairs RDDs.

The main difference between map() and mapValues() is that

  • map() operates on the entire key-value pair
  • mapValues() operates only on the values of the key-value pair.

Before comparing Spark map() and mapValues() let us create an RDD on which we can apply these map operations and compare the differences.


//Imports
import org.apache.spark.{SparkConf, SparkContext}

// Create a Spark context
val conf = new SparkConf().setAppName("ordersPurchasesRDD").setMaster("local[2]")
val sc = new SparkContext(conf)

// Create a sample RDD with orders and purchases data
val ordersPurchasesData = sc.parallelize(Seq(
  ("order1", ("product1", 100)),
  ("order1", ("product2", 200)),
  ("order2", ("product3", 300)),
  ("order2", ("product4", 400)),
  ("order3", ("product5", 500)),
  ("order3", ("product6", 600))
))
ordersPurchasesData.collect()

Yields below output.


//Output:
Array[(String, (String, Int))] = Array((order1,(product1,100)), 
                                       (order1,(product2,200)), 
                                       (order2,(product3,300)), 
                                       (order2,(product4,400)), 
                                       (order3,(product5,500)), 
                                       (order3,(product6,600)))

In this example, we created a sample RDD with orders and purchases data, where each key represents an orderID and each value represents a tuple of product name and price.

Here are examples of how map() and mapValues() work on the RDD created:

Example of map():

Using map() transformation let’s add 1 to the price and concatenate the orderID, product name, and price with a hyphen for each orderID.


// Apply map() to add 1 to the purchase price and concatenate the orderId, product name and price
val updatedPurchases = ordersPurchasesData
      .map(item => (item._1, item._1 + "-" 
       + item._2._1 + "-" 
       + (item._2._2 + 1)))
updatedPurchases.collect()


Yields below output.


// Output :
Array[(String, String)] = Array((order1,order1-product1-101), 
                                (order1,order1-product2-201), 
                                (order2,order2-product3-301), 
                                (order2,order2-product4-401), 
                                (order3,order3-product5-501), 
                                (order3,order3-product6-601))

The resulting RDD updatedPurchases will contain the same keys as the input RDD, but with updated values.

Example of mapValues():

Suppose for the same RDD we only add 1 to the price, converting the Product name to Upper case, we can use mapValues() to transform only the values of the key-value pair:


// Apply mapValues() to add 1 to the purchase price
val updatedPurchasesMap = ordersPurchasesData.mapValues(
      value => (value._1.toUpperCase, value._2 + 1)
  )
updatedPurchasesMap.collect

Yields below output.


// Output:
Array[(String, (String, Int))] = Array((order1,(PRODUCT1,101)), 
                                       (order1,(PRODUCT2,201)), 
                                       (order2,(PRODUCT3,301)), 
                                       (order2,(PRODUCT4,401)), 
                                       (order3,(PRODUCT5,501)), 
                                       (order3,(PRODUCT6,601)))

Here the mapValue() transformation can only be applied to the value of the key-value pair. The resulting RDD will have tuples where the second element is a Map that applies the mapValues() transformation to the Map in the original RDD and then applies an additional transformation using the toUpperCase() function to convert each product name to uppercase

5. Conclusion

In conclusion, Spark map() and mapValues() are both powerful RDD transformation operations in Spark, but they have different properties that make them better suited for different use cases. map() is more flexible and can be used to transform RDDs in many ways, whereas mapValues() is faster and easier to use when working with key-value pair RDDs.

Overall, the choice between map() and mapValues() depends on the specific requirements of your application. If you are working with key-value pair RDDs and need to transform only the values, mapValues() is likely the better choice due to its performance and ease of use. If you need to perform more complex transformations on an RDD, then map() maybe the more appropriate choice.

Complete Code


//Imports
import org.apache.spark.{SparkConf, SparkContext}

// Create a Spark context
val conf = new SparkConf().setAppName("ordersPurchasesRDD").setMaster("local[2]")
val sc = new SparkContext(conf)


// Create a sample RDD with orders and purchases data
val ordersPurchasesData = sc.parallelize(Seq(
  ("order1", ("product1", 100)),
  ("order1", ("product2", 200)),
  ("order2", ("product3", 300)),
  ("order2", ("product4", 400)),
  ("order3", ("product5", 500)),
  ("order3", ("product6", 600))
))
ordersPurchasesData.collect()

//Output of ordersPurchasesData
res0: Array[(String, (String, Int))] = Array((order1,(product1,100)), (order1,(product2,200)), (order2,(product3,300)), (order2,(product4,400)), (order3,(product5,500)), (order3,(product6,600)))


// Apply map() to add 1 to the purchase price and concatenate the orderId, product name and price
val updatedPurchases = ordersPurchasesData.map(item => (item._1, item._1 + "-" + item._2._1 + "-" + (item._2._2 + 1)))
updatedPurchases.collect()

//Output of updatedPurchases
res1: Array[(String, String)] = Array((order1,order1-product1-101), (order1,order1-product2-201), (order2,order2-product3-301), (order2,order2-product4-401), (order3,order3-product5-501), (order3,order3-product6-601))

// Apply mapValues() to add 1 to the purchase price
val updatedPurchasesMap = ordersPurchasesData.mapValues(value => (value._1.toUpperCase, value._2 + 1))
updatedPurchasesMap.collect

//Output of updatedPurchasesMap
res2: Array[(String, (String, Int))] = Array((order1,(PRODUCT1,101)), (order1,(PRODUCT2,201)), (order2,(PRODUCT3,301)), (order2,(PRODUCT4,401)), (order3,(PRODUCT5,501)), (order3,(PRODUCT6,601)))

Related Articles