• Post author:
  • Post category:PySpark
  • Post last modified:March 27, 2024
  • Reading time:12 mins read
You are currently viewing PySpark RDD Actions with examples

RDD actions are PySpark operations that return the values to the driver program. Any function on RDD that returns other than RDD is considered as an action in PySpark programming. In this tutorial, I will explain the most used RDD actions with examples.


Action functions trigger the transformations to execute. As mentioned in RDD Transformations, all transformations are lazy evaluation meaning they do not get executed right away, and action trigger them to execute.

PySpark RDD Actions Example

Before we start explaining RDD actions with examples, first, let’s create an RDD.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data=[("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)]
inputRDD = spark.sparkContext.parallelize(data)
listRdd = spark.sparkContext.parallelize([1,2,3,4,5,3,2])

Note that we have created two RDDs in the above code snippet and we use these two as and when necessary to demonstrate the RDD actions.

aggregate – action

Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions “combOp” and a neutral “zero value.”

The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U

syntax: aggregate(zeroValueseqOpcombOp)

Example 1

seqOp = (lambda x, y: x + y)
combOp = (lambda x, y: x + y)
agg=listRdd.aggregate(0, seqOp, combOp)
print(agg) # output 20

Example 2

#aggregate 2
seqOp2 = (lambda x, y: (x[0] + y, x[1] + 1))
combOp2 = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
agg2=listRdd.aggregate((0, 0), seqOp2, combOp2)
print(agg2) # output (20,7)

treeAggregate – action

treeAggregate() – Aggregates the elements of this RDD in a multi-level tree pattern. The output of this function will be similar to the aggregate function.

Syntax: treeAggregate(zeroValueseqOpcombOpdepth=2)

#treeAggregate. This is similar to aggregate
seqOp = (lambda x, y: x + y)
combOp = (lambda x, y: x + y)
agg=listRdd.treeAggregate(0, seqOp, combOp)
print(agg) # output 20

fold – action

fold() – Aggregate the elements of each partition, and then the results for all the partitions.

from operator import add
foldRes=listRdd.fold(0, add)


reduce() – Reduces the elements of the dataset using the specified binary operator.

print(redRes) # output 20


treeReduce() – Reduces the elements of this RDD in a multi-level tree pattern.

#treeReduce. This is similar to reduce
add = lambda x, y: x + y
print(redRes) # output 20


collect() -Return the complete dataset as an Array.

data = listRdd.collect()

count, countApprox, countApproxDistinct

count() – Return the count of elements in the dataset.

countApprox() – Return approximate count of elements in the dataset, this method returns incomplete when execution time meets timeout.

countApproxDistinct() – Return an approximate number of distinct elements in the dataset.

#count, countApprox, countApproxDistinct
print("Count : "+str(listRdd.count()))
#Output: Count : 20
print("countApprox : "+str(listRdd.countApprox(1200)))
#Output: countApprox : (final: [7.000, 7.000])
print("countApproxDistinct : "+str(listRdd.countApproxDistinct()))
#Output: countApproxDistinct : 5
print("countApproxDistinct : "+str(inputRDD.countApproxDistinct()))
#Output: countApproxDistinct : 5


countByValue() – Return Map[T,Long] key representing each unique value in dataset and value represents count each value present.

#countByValue, countByValueApprox
print("countByValue :  "+str(listRdd.countByValue()))


first() – Return the first element in the dataset.

print("first :  "+str(listRdd.first()))
#Output: first :  1
print("first :  "+str(inputRDD.first()))
#Output: first :  (Z,1)


top() – Return top n elements from the dataset.

Note: Use this method only when the resulting array is small, as all the data is loaded into the driver’s memory.

print("top : "+str(listRdd.top(2)))
#Output: take : 5,4
print("top : "+str(inputRDD.top(2)))
#Output: take : (Z,1),(C,40)


min() – Return the minimum value from the dataset.

print("min :  "+str(listRdd.min()))
#Output: min :  1
print("min :  "+str(inputRDD.min()))
#Output: min :  (A,20)  


max() – Return the maximum value from the dataset.

print("max :  "+str(listRdd.max()))
#Output: max :  5
print("max :  "+str(inputRDD.max()))
#Output: max :  (Z,1)

take, takeOrdered, takeSample

take() – Return the first num elements of the dataset.

takeOrdered() – Return the first num (smallest) elements from the dataset and this is the opposite of the take() action.
Note: Use this method only when the resulting array is small, as all the data is loaded into the driver’s memory.

takeSample() – Return the subset of the dataset in an Array.
Note: Use this method only when the resulting array is small, as all the data is loaded into the driver’s memory.

#take, takeOrdered, takeSample
print("take : "+str(listRdd.take(2)))
#Output: take : 1,2
print("takeOrdered : "+ str(listRdd.takeOrdered(2)))
#Output: takeOrdered : 1,2
print("take : "+str(listRdd.takeSample()))

PySpark Actions – Complete example

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data=[("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)]
inputRDD = spark.sparkContext.parallelize(data)
listRdd = spark.sparkContext.parallelize([1,2,3,4,5,3,2])

seqOp = (lambda x, y: x + y)
combOp = (lambda x, y: x + y)
agg=listRdd.aggregate(0, seqOp, combOp)
print(agg) # output 20

#aggregate 2
seqOp2 = (lambda x, y: (x[0] + y, x[1] + 1))
combOp2 = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
agg2=listRdd.aggregate((0, 0), seqOp2, combOp2)
print(agg2) # output (20,7)

agg2=listRdd.treeAggregate(0,seqOp, combOp)
print(agg2) # output 20

from operator import add
foldRes=listRdd.fold(0, add)
print(foldRes) # output 20

print(redRes) # output 20

#treeReduce. This is similar to reduce
add = lambda x, y: x + y
print(redRes) # output 20

data = listRdd.collect()

#count, countApprox, countApproxDistinct
print("Count : "+str(listRdd.count()))
#Output: Count : 20
print("countApprox : "+str(listRdd.countApprox(1200)))
#Output: countApprox : (final: [7.000, 7.000])
print("countApproxDistinct : "+str(listRdd.countApproxDistinct()))
#Output: countApproxDistinct : 5
print("countApproxDistinct : "+str(inputRDD.countApproxDistinct()))
#Output: countApproxDistinct : 5

#countByValue, countByValueApprox
print("countByValue :  "+str(listRdd.countByValue()))

print("first :  "+str(listRdd.first()))
#Output: first :  1
print("first :  "+str(inputRDD.first()))
#Output: first :  (Z,1)

print("top : "+str(listRdd.top(2)))
#Output: take : 5,4
print("top : "+str(inputRDD.top(2)))
#Output: take : (Z,1),(C,40)

print("min :  "+str(listRdd.min()))
#Output: min :  1
print("min :  "+str(inputRDD.min()))
#Output: min :  (A,20)  

print("max :  "+str(listRdd.max()))
#Output: max :  5
print("max :  "+str(inputRDD.max()))
#Output: max :  (Z,1)

#take, takeOrdered, takeSample
print("take : "+str(listRdd.take(2)))
#Output: take : 1,2
print("takeOrdered : "+ str(listRdd.takeOrdered(2)))
#Output: takeOrdered : 1,2
print("take : "+str(listRdd.takeSample()))


RDD actions are operations that return non-RDD values, since RDDs are lazy they do not execute the transformation functions until we call PySpark actions. hence, all these functions trigger the transformations to execute and finally return the value of the action functions to the driver program.


This Post Has 2 Comments

  1. Anonymous

    Por cierto, Muchas gracias. Aprendo gracias a esta web. Es la mejor fuente.

  2. Anonymous

    Por cierto, Muchas gracias. Aprendo gracias a esta web. Es la mejor fuente.

Comments are closed.