Spark Accumulators Explained

Spark Accumulators are shared variables which are only “added” through an associative and commutative operation and are used to perform counters (Similar to Map-reduce counters) or sum operations

Spark by default supports to create an accumulators of any numeric type and provide a capability to add custom accumulator types.

Programmers can create following accumulators

  • named accumulators
  • unnamed accumulators

When you create a named accumulator, you can see them on Spark web UI under the “Accumulator” tab. On this tab, you will see two tables; the first table “accumulable” – consists of all named accumulator variables and their values. And on the second table “Tasks” – value for each accumulator modified by a task.

And, unnamed accumulators are not shows on Spark web UI, For all practical purposes it is suggestable to use named accumulators.

1. Creating Accumulator variable

Spark by default provides accumulator methods for long, double and collection types. All these methods are present in SparkContext class and return <a href="#LongAccumulator">LongAccumulator</a>, <a href="#DoubleAccumulator">DoubleAccumulator</a>, and <a href="#CollectionAccumulator">CollectionAccumulator</a> respectively.

For example, you can create long accumulator on spark-shell using


// Creating Accumulator variable
scala> val accum = sc.longAccumulator("SumAccumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(SumAccumulator), value: 0)

The above statement creates a named accumulator “SumAccumulator”. Now, Let’s see how to add up the elements from an array to this accumulator.


scala> sc.parallelize(Array(1, 2, 3)).foreach(x => accum.add(x))
-----
-----
scala> accum.value
res2: Long = 6

Each of these accumulator classes has several methods, among these, add() method call from tasks running on the cluster. Tasks can’t read the values from the accumulator and only the driver program can read accumulators value using the value() method.

2. Long Accumulator

longAccumulator() methods from SparkContext returns LongAccumulator

Syntax


// Long Accumulator
def longAccumulator : org.apache.spark.util.LongAccumulator
def longAccumulator(name : scala.Predef.String) : org.apache.spark.util.LongAccumulator

You can create named accumulators for long type using SparkContext.longAccumulator(v) and for unnamed use signature that doesn’t take an argument.


  val spark = SparkSession.builder()
    .appName("SparkByExample")
    .master("local")
    .getOrCreate()

  val longAcc = spark.sparkContext.longAccumulator("SumAccumulator")
  
  val rdd = spark.sparkContext.parallelize(Array(1, 2, 3))

  rdd.foreach(x => longAcc.add(x))
  println(longAcc.value)

LongAccumulator class provides follwoing methods

  • isZero
  • copy
  • reset
  • add
  • count
  • sum
  • avg
  • merge
  • value

3. Double Accumulator

For named double type using SparkContext.doubleAccumulator(v) and for unnamed use signature that doesn’t take an argument.

Syntax


// Double Accumulator
def doubleAccumulator : org.apache.spark.util.DoubleAccumulator
def doubleAccumulator(name : scala.Predef.String) : org.apache.spark.util.DoubleAccumulator

DoubleAccumulator class also provides methods similar to LongAccumulator

4. Collection Accumulator

For named collection type using SparkContext.collectionAccumulator(v) and for unnamed use signature that doesn’t take an argument.

Syntax


// Collection Accumulator
def collectionAccumulator[T] : org.apache.spark.util.CollectionAccumulator[T]
def collectionAccumulator[T](name : scala.Predef.String) : org.apache.spark.util.CollectionAccumulator[T]

CollectionAccumulator class provides following methods

  • isZero
  • copyAndReset
  • copy
  • reset
  • add
  • merge
  • value

Note: Each of these accumulator classes has several methods, among these, add() method call from tasks running on the cluster. Tasks can’t read the values from the accumulator and only the driver program can read accumulators value using the value() method.

Conclusion

In this Spark accumulators shared variable article, you have learned the Accumulators are only “added” through an associative and commutative and operation and are used to perform counters (Similar to Map-reduce counters) or sum operations and also learned different Accumulator classes along with their methods.

Reference

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply

This Post Has 6 Comments

  1. Anonymous

    PLease provide in JAVA

  2. Anonymous

    PLease provide in JAVA

  3. Anonymous

    Can you please provide the same examles in pyspark

  4. Anonymous

    Can you please provide the same examles in pyspark