You are currently viewing Spark RDD fold() function example
Photo by Tyler Franta on Unsplash

In this tutorial, you will learn fold syntax, usage and how to use Spark RDD fold() function in order to calculate min, max, and a total of the elements with Scala example and the same approach could be used for Java and PySpark (python).

Syntax


def fold(zeroValue: T)(op: (T, T) => T): T

Usage

Since RDD’s are partitioned, the fold() function takes full advantage of it by first aggregating elements in each partition and then aggregating results of all partitions to get the final result. The result of this function is the same as this RDD type.

This function takes the following arguments –

zeroValue – Initial value to be used for each partition in folding, this value would be used to initialize the accumulator for each partition. we mostly use 0 for integer and Nil for collections.

op – an operator used to both accumulate results within a partition and combine results from all partitions,

Using fold on RDD(Int) type

This example demonstrates spark rdd fold() function with elements of type Int.


  //fold example
  val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
  println("Partitions : "+listRdd.getNumPartitions)
  println("Total : "+listRdd.fold(0)((acc,ele) => {acc + ele}))
  println("Total with init value 2 : "+listRdd.fold(2)((acc,ele) => {acc + ele}))
  println("Min : "+listRdd.fold(0)((acc,ele) => {acc min ele}))
  println("Max : "+listRdd.fold(0)((acc,ele) => {acc max ele})) 

Yields output


Partitions : 3
Total : 20
Total with init value 2 : 28
Min : 0
Max : 5

In this example, RDD type is Int and it returns the result of Int type.

Using fold on RDD(String,Int) type

This example demonstrates spark rdd fold() function with elements of type (String,Int).


  val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),
("C", 40),("B", 30),("B", 60)))

  println("Total : "+inputRDD.fold(("",0))( (acc,ele)=>{ ("Total", acc._2 + ele._2)  }))
  println("Min : "+inputRDD.fold(("",0))( (acc,ele)=>{ ("Min", acc._2 min ele._2)  }))
  println("Max : "+inputRDD.fold(("",0))( (acc,ele)=>{ ("Max", acc._2 max ele._2)  }))

This example has this RDD type as (String,Int) and fold function returns the same (String,Int) type . Yields below output


Total : (Total,181)
Min : (Min,0)
Max : (Max,60)

Points to Note

  • fold() is similar to reduce() except it takes a ‘Zero value‘ as an initial value for each partition.
  • fold() is similar to aggregate() with a difference; fold return type should be the same as this RDD element type whereas aggregation can return any type.
  • fold() also same as foldByKey() except foldByKey() operates on Pair RDD

This complete example is available at GitHub project for reference.

Complete example


package com.sparkbyexamples.spark.rdd.functions

import org.apache.spark.sql.SparkSession

object foldExample extends App {

  val spark = SparkSession.builder()
    .appName("SparkByExamples.com")
    .master("local[3]")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  //fold example
  val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
  println("Partitions : "+listRdd.getNumPartitions)
  println("Total : "+listRdd.fold(0)((acc,ele) => {acc + ele}))
  println("Total with init value 2 : "+listRdd.fold(2)((acc,ele) => {acc + ele}))
  println("Min : "+listRdd.fold(0)((acc,ele) => {acc min ele}))
  println("Max : "+listRdd.fold(0)((acc,ele) => {acc max ele}))
  
  val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),
("C", 40),("B", 30),("B", 60)))

  println("Total : "+inputRDD.fold(("",0))( (acc,ele)=>{ ("Total", acc._2 + ele._2)  }))
  println("Min : "+inputRDD.fold(("",0))( (acc,ele)=>{ ("Min", acc._2 min ele._2)  }))
  println("Max : "+inputRDD.fold(("",0))( (acc,ele)=>{ ("Max", acc._2 max ele._2)  }))

}

Happy learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium