Spark RDD reduce() function example

Spark RDD reduce() aggregate action function is used to calculate min, max, and total of elements in a dataset, In this tutorial, I will explain RDD reduce function syntax and usage with scala language and the same approach could be used with Java and PySpark (python) languages.

Syntax


def reduce(f: (T, T) => T): T

Usage

RDD reduce() function takes function type as an argument and returns the RDD with the same type as input. It reduces the elements of the input RDD using the binary operator specified.

Reduce a list – Calculate min, max, and total of elements

Below snippet reduces the collection for sum, minimum and maximum


  val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
  println("output min using binary : "+listRdd.reduce(_ min _))
  println("output max using binary : "+listRdd.reduce(_ max _))
  println("output sum using binary : "+listRdd.reduce(_ + _))

Yields below output


output min using binary : 1
output max using binary : 5
output sum using binary : 20

Alternatively, you can also write the above operations as below.


  val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
  println("output min : "+listRdd.reduce( (a,b) => a min b))
  println("output max : "+listRdd.reduce( (a,b) => a max b))
  println("output sum : "+listRdd.reduce( (a,b) => a + b))

Yields below output


output min : 1
output max : 5
output sum : 20

Reduce function on Tupple RDD(String,Int)

In this example, you will learn how to use reduce function on a complex RDD, which mostly used in production applications.


  val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)))

  println("output min : "+inputRDD.reduce( (a,b)=> ("max",a._2 min b._2))._2)
  println("output max : "+inputRDD.reduce( (a,b)=> ("max",a._2 max b._2))._2)
  println("output sum : "+inputRDD.reduce( (a,b)=> ("Sum",a._2 + b._2))._2)

Yields below output


output min : 1
output max : 60
output sum : 181

This complete example is available at GitHub project for reference.

Points to Note

  • reduce() is similar to fold() except reduce takes a ‘Zero value‘ as an initial value for each partition.
  • reduce() is similar to aggregate() with a difference; reduce return type should be the same as this RDD element type whereas aggregation can return any type.
  • reduce() also same as reduceByKey() except reduceByKey() operates on Pair RDD

Complete example


package com.sparkbyexamples.spark.rdd.functions

import org.apache.spark.sql.SparkSession

object reduceExample extends App {

  val spark = SparkSession.builder()
    .appName("SparkByExamples.com")
    .master("local[3]")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))

  println("output sum using binary : "+listRdd.reduce(_ min _))
  println("output min using binary : "+listRdd.reduce(_ max _))
  println("output max using binary : "+listRdd.reduce(_ + _))


  // Alternatively you can write
  println("output min : "+listRdd.reduce( (a,b) => a min b))
  println("output max : "+listRdd.reduce( (a,b) => a max b))
  println("output sum : "+listRdd.reduce( (a,b) => a + b))


  val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),
    ("C", 40),("B", 30),("B", 60)))

  println("output max : "+inputRDD.reduce( (a,b)=> ("max",a._2 min b._2))._2)
  println("output max : "+inputRDD.reduce( (a,b)=> ("max",a._2 max b._2))._2)
  println("output sum : "+inputRDD.reduce( (a,b)=> ("Sum",a._2 + b._2))._2)
}

Happy learning !!

Naveen (NNK)

I am Naveen (NNK) working as a Principal Engineer. I am a seasoned Apache Spark Engineer with a passion for harnessing the power of big data and distributed computing to drive innovation and deliver data-driven insights. I love to design, optimize, and managing Apache Spark-based solutions that transform raw data into actionable intelligence. I am also passion about sharing my knowledge in Apache Spark, Hive, PySpark, R etc.

Leave a Reply

This Post Has 2 Comments

  1. aymen

    merci pour la simple explication

  2. aymen

    merci pour la simple explication

You are currently viewing Spark RDD reduce() function example