Spark RDD aggregate() operation example

In this tutorial, you will learn how to aggregate elements using Spark RDD aggregate() action to calculate min, max, total, and count of RDD elements with scala language, and the same approach could be used for Java and PySpark (python).

RDD aggregate() Syntax


def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
     (implicit arg0: ClassTag[U]): U

Usage

Since RDD’s are partitioned, the aggregate takes full advantage of it by first aggregating elements in each partition and then aggregating results of all partition to get the final result. and the result could be any type than the type of your RDD.

This takes the following arguments –

zeroValue – Initial value to be used for each partition in aggregation, this value would be used to initialize the accumulator. we mostly use 0 for integer and Nil for collections.

seqOp – This operator is used to accumulate the results of each partition, and stores the running accumulated result to U,

combOp – This operator is used to combine the results of all partitions U.

Using aggregate on RDD(Int) type

In our example, param0 is used as seqOp and param1 is used as combOp, On param0 “accu” is an accumulator that accumulates the values for each partition and on param1, accumulates the result of all accululators from all partitions.


  val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
  def param0= (accu:Int, v:Int) => accu + v
  def param1= (accu1:Int,accu2:Int) => accu1 + accu2
  val result = listRdd.aggregate(0)(param0,param1)
  println("output 1 =>" + result)

In this snippet, RDD type is Int and it returns the result of Int type.

Using aggregate on RDD(String,Int) type

In our example, param3 is used as seqOp and param4 is used as combOp.


  val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)))
  //aggregate
  def param3= (accu:Int, v:(String,Int)) => accu + v._2
  def param4= (accu1:Int,accu2:Int) => accu1 + accu2
  val result2 = inputRDD.aggregate(0)(param3,param4)
  println("output 2 =>" + result)

This snippet has this RDD type as (String,Int) and it returns Int type.

Output


output 1 : 20
output 2 : 181

This complete example is available at GitHub project for reference.

Points to Note

  • aggregate() is similar to fold() and reduce() except it returns RDD type of any time was as other 2 returns same RDD type.
  • aggregate() also same as aggregateByKey() except for aggregateByKey() operates on Pair RDD

Complete example


package com.sparkbyexamples.spark.rdd.functions

import org.apache.spark.sql.SparkSession

object aggregateExample extends App {

  val spark = SparkSession.builder()
    .appName("SparkByExamples.com")
    .master("local")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  //aggregate example
  val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
  def param0= (accu:Int, v:Int) => accu + v
  def param1= (accu1:Int,accu2:Int) => accu1 + accu2
  println("output 1 : "+listRdd.aggregate(0)(param0,param1))

  val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)))
  def param3= (accu:Int, v:(String,Int)) => accu + v._2
  def param4= (accu1:Int,accu2:Int) => accu1 + accu2
  println("output 2 : "+inputRDD.aggregate(0)(param3,param4))
}

Happy learning !!

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

This Post Has 2 Comments

  1. Anonymous

    can u give some other value instead of 0 in zeroValue place ?

    1. NNK

      Yes, you can provide any numeric value. But usually, we would want to convert to zero as the default value.

Leave a Reply