In this tutorial, you will learn how to aggregate elements using Spark RDD aggregate()
action to calculate min, max, total, and count of RDD elements with scala language, and the same approach could be used for Java and PySpark (python).
RDD aggregate() Syntax
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
(implicit arg0: ClassTag[U]): U
Usage
Since RDD’s are partitioned, the aggregate takes full advantage of it by first aggregating elements in each partition and then aggregating results of all partition to get the final result. and the result could be any type than the type of your RDD.
This takes the following arguments –
zeroValue
– Initial value to be used for each partition in aggregation, this value would be used to initialize the accumulator. we mostly use 0
for integer and Nil
for collections.
seqOp
– This operator is used to accumulate the results of each partition, and stores the running accumulated result to U,
combOp
– This operator is used to combine the results of all partitions U.
Using aggregate on RDD(Int) type
In our example, param0 is used as seqOp and param1 is used as combOp, On param0 “accu” is an accumulator that accumulates the values for each partition and on param1, accumulates the result of all accululators from all partitions.
val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
def param0= (accu:Int, v:Int) => accu + v
def param1= (accu1:Int,accu2:Int) => accu1 + accu2
val result = listRdd.aggregate(0)(param0,param1)
println("output 1 =>" + result)
In this snippet, RDD type is Int and it returns the result of Int type.
Using aggregate on RDD(String,Int) type
In our example, param3 is used as seqOp and param4 is used as combOp.
val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)))
//aggregate
def param3= (accu:Int, v:(String,Int)) => accu + v._2
def param4= (accu1:Int,accu2:Int) => accu1 + accu2
val result2 = inputRDD.aggregate(0)(param3,param4)
println("output 2 =>" + result)
This snippet has this RDD type as (String,Int) and it returns Int type.
Output
output 1 : 20
output 2 : 181
This complete example is available at GitHub project for reference.
Points to Note
- aggregate() is similar to fold() and reduce() except it returns RDD type of any time was as other 2 returns same RDD type.
- aggregate() also same as
aggregateByKey()
except for aggregateByKey() operates on Pair RDD
Complete example
package com.sparkbyexamples.spark.rdd.functions
import org.apache.spark.sql.SparkSession
object aggregateExample extends App {
val spark = SparkSession.builder()
.appName("SparkByExamples.com")
.master("local")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
//aggregate example
val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
def param0= (accu:Int, v:Int) => accu + v
def param1= (accu1:Int,accu2:Int) => accu1 + accu2
println("output 1 : "+listRdd.aggregate(0)(param0,param1))
val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)))
def param3= (accu:Int, v:(String,Int)) => accu + v._2
def param4= (accu1:Int,accu2:Int) => accu1 + accu2
println("output 2 : "+inputRDD.aggregate(0)(param3,param4))
}
Happy learning !!
can u give some other value instead of 0 in zeroValue place ?
Yes, you can provide any numeric value. But usually, we would want to convert to zero as the default value.