In this tutorial, you will learn fold syntax, usage and how to use Spark RDD fold()
function in order to calculate min, max, and a total of the elements with Scala example and the same approach could be used for Java and PySpark (python).
Syntax
def fold(zeroValue: T)(op: (T, T) => T): T
Usage
Since RDD’s are partitioned, the fold()
function takes full advantage of it by first aggregating elements in each partition and then aggregating results of all partitions to get the final result. The result of this function is the same as this RDD type.
This function takes the following arguments –
zeroValue
– Initial value to be used for each partition in folding, this value would be used to initialize the accumulator for each partition. we mostly use 0
for integer and Nil
for collections.
op
– an operator used to both accumulate results within a partition and combine results from all partitions,
Using fold on RDD(Int) type
This example demonstrates spark rdd fold() function with elements of type Int.
//fold example
val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
println("Partitions : "+listRdd.getNumPartitions)
println("Total : "+listRdd.fold(0)((acc,ele) => {acc + ele}))
println("Total with init value 2 : "+listRdd.fold(2)((acc,ele) => {acc + ele}))
println("Min : "+listRdd.fold(0)((acc,ele) => {acc min ele}))
println("Max : "+listRdd.fold(0)((acc,ele) => {acc max ele}))
Yields output
Partitions : 3
Total : 20
Total with init value 2 : 28
Min : 0
Max : 5
In this example, RDD type is Int
and it returns the result of Int type.
Using fold on RDD(String,Int) type
This example demonstrates spark rdd fold() function with elements of type (String,Int).
val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),
("C", 40),("B", 30),("B", 60)))
println("Total : "+inputRDD.fold(("",0))( (acc,ele)=>{ ("Total", acc._2 + ele._2) }))
println("Min : "+inputRDD.fold(("",0))( (acc,ele)=>{ ("Min", acc._2 min ele._2) }))
println("Max : "+inputRDD.fold(("",0))( (acc,ele)=>{ ("Max", acc._2 max ele._2) }))
This example has this RDD type as (String,Int)
and fold function returns the same (String,Int) type . Yields below output
Total : (Total,181)
Min : (Min,0)
Max : (Max,60)
Points to Note
- fold() is similar to reduce() except it takes a ‘Zero value‘ as an initial value for each partition.
- fold() is similar to aggregate() with a difference; fold return type should be the same as this RDD element type whereas aggregation can return any type.
- fold() also same as
foldByKey()
except foldByKey() operates on Pair RDD
This complete example is available at GitHub project for reference.
Complete example
package com.sparkbyexamples.spark.rdd.functions
import org.apache.spark.sql.SparkSession
object foldExample extends App {
val spark = SparkSession.builder()
.appName("SparkByExamples.com")
.master("local[3]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
//fold example
val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
println("Partitions : "+listRdd.getNumPartitions)
println("Total : "+listRdd.fold(0)((acc,ele) => {acc + ele}))
println("Total with init value 2 : "+listRdd.fold(2)((acc,ele) => {acc + ele}))
println("Min : "+listRdd.fold(0)((acc,ele) => {acc min ele}))
println("Max : "+listRdd.fold(0)((acc,ele) => {acc max ele}))
val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),
("C", 40),("B", 30),("B", 60)))
println("Total : "+inputRDD.fold(("",0))( (acc,ele)=>{ ("Total", acc._2 + ele._2) }))
println("Min : "+inputRDD.fold(("",0))( (acc,ele)=>{ ("Min", acc._2 min ele._2) }))
println("Max : "+inputRDD.fold(("",0))( (acc,ele)=>{ ("Max", acc._2 max ele._2) }))
}
Happy learning !!