Spark RDD reduce()
aggregate action function is used to calculate min, max, and total of elements in a dataset, In this tutorial, I will explain RDD reduce function syntax and usage with scala language and the same approach could be used with Java and PySpark (python) languages.
Syntax
def reduce(f: (T, T) => T): T
Usage
RDD reduce()
function takes function type as an argument and returns the RDD with the same type as input. It reduces the elements of the input RDD using the binary operator specified.
Reduce a list – Calculate min, max, and total of elements
Below snippet reduces the collection for sum, minimum and maximum
val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
println("output min using binary : "+listRdd.reduce(_ min _))
println("output max using binary : "+listRdd.reduce(_ max _))
println("output sum using binary : "+listRdd.reduce(_ + _))
Yields below output
output min using binary : 1
output max using binary : 5
output sum using binary : 20
Alternatively, you can also write the above operations as below.
val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
println("output min : "+listRdd.reduce( (a,b) => a min b))
println("output max : "+listRdd.reduce( (a,b) => a max b))
println("output sum : "+listRdd.reduce( (a,b) => a + b))
Yields below output
output min : 1
output max : 5
output sum : 20
Reduce function on Tupple RDD(String,Int)
In this example, you will learn how to use reduce function on a complex RDD, which mostly used in production applications.
val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)))
println("output min : "+inputRDD.reduce( (a,b)=> ("max",a._2 min b._2))._2)
println("output max : "+inputRDD.reduce( (a,b)=> ("max",a._2 max b._2))._2)
println("output sum : "+inputRDD.reduce( (a,b)=> ("Sum",a._2 + b._2))._2)
Yields below output
output min : 1
output max : 60
output sum : 181
This complete example is available at GitHub project for reference.
Points to Note
- reduce() is similar to fold() except reduce takes a ‘Zero value‘ as an initial value for each partition.
- reduce() is similar to aggregate() with a difference; reduce return type should be the same as this RDD element type whereas aggregation can return any type.
- reduce() also same as
reduceByKey()
except reduceByKey() operates on Pair RDD
Complete example
package com.sparkbyexamples.spark.rdd.functions
import org.apache.spark.sql.SparkSession
object reduceExample extends App {
val spark = SparkSession.builder()
.appName("SparkByExamples.com")
.master("local[3]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
println("output sum using binary : "+listRdd.reduce(_ min _))
println("output min using binary : "+listRdd.reduce(_ max _))
println("output max using binary : "+listRdd.reduce(_ + _))
// Alternatively you can write
println("output min : "+listRdd.reduce( (a,b) => a min b))
println("output max : "+listRdd.reduce( (a,b) => a max b))
println("output sum : "+listRdd.reduce( (a,b) => a + b))
val inputRDD = spark.sparkContext.parallelize(List(("Z", 1),("A", 20),("B", 30),
("C", 40),("B", 30),("B", 60)))
println("output max : "+inputRDD.reduce( (a,b)=> ("max",a._2 min b._2))._2)
println("output max : "+inputRDD.reduce( (a,b)=> ("max",a._2 max b._2))._2)
println("output sum : "+inputRDD.reduce( (a,b)=> ("Sum",a._2 + b._2))._2)
}
Happy learning !!
merci pour la simple explication
merci pour la simple explication