Spark RDD reduce() function example

Spark RDD reduce() aggregate action function is used to calculate min, max, and total of elements in a dataset, In this tutorial, I will explain RDD reduce function syntax and usage with scala language and the same approach could be used with Java and PySpark (python) languages.

Syntax


def reduce(f: (T, T) => T): T

Usage

RDD reduce() function takes function type as an argument and returns the RDD with the same type as input. It reduces the elements of the input RDD using the binary operator specified.

Reduce a list – Calculate min, max, and total of elements

Below snippet reduces the collection for sum, minimum and maximum


  val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
  println("output sum using binary : "+listRdd.reduce(_ min _))
  println("output min using binary : "+listRdd.reduce(_ max _))
  println("output max using binary : "+listRdd.reduce(_ + _))

Yields below output


output sum using binary : 1
output min using binary : 5
output max using binary : 20

Alternatively, you can also write the above operations as below.


  val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
  println("output min : "+listRdd.reduce( (a,b) => a min b))
  println("output max : "+listRdd.reduce( (a,b) => a max b))
  println("output sum : "+listRdd.reduce( (a,b) => a + b))

Yields below output


output min : 1
output max : 5
output sum : 20

Reduce function on Tupple RDD(String,Int)

In this example, you will learn how to use reduce function on a complex RDD, which mostly used in production applications.


  val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)))

  println("output max : "+inputRDD.reduce( (a,b)=> ("max",a._2 min b._2))._2)
  println("output max : "+inputRDD.reduce( (a,b)=> ("max",a._2 max b._2))._2)
  println("output sum : "+inputRDD.reduce( (a,b)=> ("Sum",a._2 + b._2))._2)

Yields below output


output max : 1
output max : 60
output sum : 181

This complete example is available at GitHub project for reference.

Points to Note

  • reduce() is similar to fold() except reduce takes a ‘Zero value‘ as an initial value for each partition.
  • reduce() is similar to aggregate() with a difference; reduce return type should be the same as this RDD element type whereas aggregation can return any type.
  • reduce() also same as reduceByKey() except reduceByKey() operates on Pair RDD

Complete example


package com.sparkbyexamples.spark.rdd.functions

import org.apache.spark.sql.SparkSession

object reduceExample extends App {

  val spark = SparkSession.builder()
    .appName("SparkByExamples.com")
    .master("local[3]")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))

  println("output sum using binary : "+listRdd.reduce(_ min _))
  println("output min using binary : "+listRdd.reduce(_ max _))
  println("output max using binary : "+listRdd.reduce(_ + _))


  // Alternatively you can write
  println("output min : "+listRdd.reduce( (a,b) => a min b))
  println("output max : "+listRdd.reduce( (a,b) => a max b))
  println("output sum : "+listRdd.reduce( (a,b) => a + b))


  val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),
    ("C", 40),("B", 30),("B", 60)))

  println("output max : "+inputRDD.reduce( (a,b)=> ("max",a._2 min b._2))._2)
  println("output max : "+inputRDD.reduce( (a,b)=> ("max",a._2 max b._2))._2)
  println("output sum : "+inputRDD.reduce( (a,b)=> ("Sum",a._2 + b._2))._2)
}

Happy learning !!

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply

Spark RDD reduce() function example