You are currently viewing reduceByKey vs groupByKey vs aggregateByKey vs combineByKey in Spark

What are the differences of reduceByKey vs groupByKey vs aggregateByKey vs combineByKey in Spark RDD? In Apache Spark, reduceByKey(), groupByKey(), aggregateByKey(), and combineByKey() are operations used for processing key-value pairs in a distributed manner on RDD. Each operation has its own characteristics and usage scenarios. In this article, Let’s explore reduceByKey vs groupByKey vs aggregateByKey vs combineByKey in Spark by applying these operations to a sample RDD.

Advertisements

Suppose we have an RDD of sales data in the form of (product name, sale amount) pairs. Here’s an example RDD:


// Import
import org.apache.spark.sql.SparkSession

// Create SparkSession
val spark = SparkSession.builder()
  .appName("SparkByExamplesDemo")
  .master("local")
  .getOrCreate()

//Create and RDD
val salesData = spark.sparkContext.parallelize(Seq(
  ("product1", 100.0),
  ("product2", 50.0),
  ("product1", 75.0),
  ("product3", 200.0),
  ("product2", 125.0),
  ("product1", 50.0),
  ("product3", 150.0)
))

Let’s see the application and properties of reduceByKey(), groupByKey(), aggregateByKey(), and combineByKey() operations using the salesData RDD created.

1. Spark reduceByKey

  • Signature: reduceByKey(func: (V, V) => V): RDD[(K, V)]
  • Description: It applies the specified binary function to the values of each key in the RDD and returns a new RDD with the same keys and the reduced values.
  • Usage: It is suitable when you want to perform a commutative and associative reduction operation on the values for each key. The function func should take two values of type V and produce a single value of type V. It minimizes data shuffling by performing local aggregations on each partition before shuffling the results across the network.

The reduceByKey operation combines the values for each key using a specified function and returns an RDD of (key, reduced value) pairs. In our example, we can use reduceByKey to calculate the total sales for each product as below:


// reduceByKey() Example
val totalSales = salesData.reduceByKey(_ + _)
totalSales.collect().foreach(println)

This will result in an RDD with the following contents:

reduceByKey vs groupByKey vs aggregateByKey vs combineByKey in Spark

2. Spark groupByKey

  • Signature: groupByKey(): RDD[(K, Iterable[V])]
  • Description: It groups the values of each key in the RDD and returns an RDD of key-value pairs, where the values are grouped into an iterable collection.
  • Usage: It is suitable when you need to group all the values associated with each key. However, it can be memory-intensive since all values for each key are collected and stored in memory.

The groupByKey operation groups the values for each key into an iterable and returns an RDD of (key, iterable) pairs. In our example, we can use groupByKey to group the sales data for each product:


// groupByKey() Example
val groupedSales = salesData.groupByKey()
groupedSales.collect().foreach(println)

This will result in an RDD with the following contents:

3. Spark aggregateByKey

  • Signature: aggregateByKey[U](zeroValue: U)(seqFunc: (U, V) => U, combFunc: (U, U) => U): RDD[(K, U)]
  • Description: It allows you to aggregate values for each key while also maintaining an aggregated value type different from the original value type.
  • Usage: It is useful when you need to perform both per-key aggregation and change the value type during the aggregation process. It takes two functions: seqFunc combines a value of type U with a value of type V to produce a new value of type U, and combFunc combines two values of type U to produce a new value of type U. The zeroValue parameter provides an initial value for aggregation.

The aggregateByKey operation allows you to aggregate the values for each key while also maintaining an aggregated value type different from the original value type. In our example, we can use aggregateByKey to calculate the total sales and count for each product:


// aggregateByKey() Example
val initialSalesCount = (0.0, 0)
val aggregateSales = salesData.aggregateByKey(initialSalesCount)(
  (salesCount, saleAmount) => (salesCount._1 + saleAmount, salesCount._2 + 1),
  (salesCount1, salesCount2) => (salesCount1._1 + salesCount2._1, salesCount1._2 + salesCount2._2)
)
val averageSales = aggregateSales.mapValues(salesCount => salesCount._1 / salesCount._2)
aggregateSales.collect().foreach(println)
averageSales.collect().foreach(println)

This will result in two RDDs. The aggregateSales RDD contains (key, (total sales, count)) pairs:

The averageSales RDD contains (key, average sales) pairs:

Note that we use an initial value of type (0.0, 0) to maintain a running total of sales and count for each key. We also define two functions, seqFunc and combFunc, to aggregate the values for each key and combine the results across partitions.

4. Spark combineByKey

  • Signature: combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
  • Description: It is a more general operation that allows you to customize the process of combining and aggregating values for each key.
  • Usage: It is useful when you have specific requirements for combining and aggregating values that are not covered by the other operations. It takes three functions: createCombiner creates an initial value of type C for each key, mergeValue merges a value of type V with an existing value of type C for the same key, and mergeCombiners merges two values of type C together. It gives you flexibility in handling different types of aggregations per key.

combineByKey allows you to customize the aggregation process per key. You define functions to create an initial value, merge a value with the existing value, and merge values across partitions.


// combineByKey() Example
val salesStatsByCategory = electronicData.combineByKey(
  salesAmount => (salesAmount, 1),
  (acc: (Int, Int), salesAmount) => (acc._1 + salesAmount, acc._2 + 1),
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).mapValues { case (totalSales, count) => (totalSales, totalSales / count.toFloat) }

salesStatsByCategory.collect().foreach(println)

This will result in an RDD with the following contents:

The values are tuples containing the total sales and the count of sales for each product.

5. Comparing reduceByKey vs groupByKey vs aggregateByKey vs combineByKey in Spark

Let’s compare reduceByKey vs groupByKey vs aggregateByKey vs combineByKey in Spark based on several properties:

PropertiesreduceByKeygroupByKeyaggregateByKeycombineByKey
SyntaxreduceByKey(func: (V, V) => V): RDD[(K, V)]groupByKey(): RDD[(K, Iterable[V])]aggregateByKey[U](zeroValue: U)(seqFunc: (U, V) => U, combFunc: (U, U) => U): RDD[(K, U)]combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
Data ShufflingIt performs local aggregations on each partition before shuffling the results across the network. It minimizes data shuffling by combining values locally.It shuffles all values for each key across the network, which can lead to a significant amount of data movement. It can be memory-intensive for large datasets since all values are collected and stored in memory.They allow you to specify custom aggregation functions, which can potentially reduce data shuffling by performing partial aggregations before shuffling.They allow you to specify custom aggregation functions, which can potentially reduce data shuffling by performing partial aggregations before shuffling.
Memory UsageIt requires less memory as it combines values using a commutative and associative function, reducing the memory footprint compared to groupByKey.It can be memory-intensive as it collects and stores all values for each key in memory.They allow you to control the memory usage by defining appropriate aggregation functions. You can choose to aggregate partial results or use custom data structures to manage memory efficiently.They allow you to control the memory usage by defining appropriate aggregation functions. You can choose to aggregate partial results or use custom data structures to manage memory efficiently.
Aggregation FlexibilityIt is suitable for commutative and associative reduction operations on values. The reduction function is fixed and predefined.It groups all values associated with each key into an iterable collection. It doesn’t allow for custom aggregation logic.They provide flexibility by allowing you to define custom aggregation functions. You can perform different aggregations per key and also maintain aggregated values of different types.They provide flexibility by allowing you to define custom aggregation functions. You can perform different aggregations per key and also maintain aggregated values of different types.
PerformanceIt can provide better performance due to its ability to perform local aggregations before shuffling.It may suffer from performance issues due to the high amount of data movement across the network and potential memory overhead.Their performance can vary depending on the complexity of the aggregation functions. If the aggregation logic is computationally expensive, it may impact performance.Their performance can vary depending on the complexity of the aggregation functions. If the aggregation logic is computationally expensive, it may impact performance.
reduceByKey vs groupByKey vs aggregateByKey vs combineByKey

In summary, the choice between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey in spark depends on the specific requirements of your application. Consider the data shuffling, memory usage, flexibility in aggregation logic, and performance characteristics to select the most appropriate operation. Generally, reduceByKey is preferred when possible due to its efficient data shuffling and reduced memory usage. However, aggregateByKey and combineByKey offer greater flexibility in custom aggregations at the cost of potentially more complex implementation and performance considerations.

6. Conclusion

In conclusion, let’s summarize the characteristics of reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Scala:

  1. reduceByKey:
    • Suitable for commutative and associative reduction operations on values.
    • Performs local aggregations on each partition before shuffling results across the network, minimizing data shuffling.
    • Requires less memory compared to groupByKey.
    • Fixed reduction function, predefined and limited flexibility.
    • Provides better performance due to local aggregations.
  2. groupByKey:
    • Groups all values associated with each key into an iterable collection.
    • Shuffles all values for each key across the network, which can lead to significant data movement and potential memory overhead.
    • Can be memory-intensive for large datasets as all values are collected and stored in memory.
    • Limited flexibility in custom aggregation logic.
    • May suffer from performance issues due to data movement.
  3. aggregateByKey:
    • Allows custom aggregation logic and maintains aggregated values of different types.
    • Performs partial aggregations before shuffling, reducing data shuffling compared to groupByKey.
    • Provides flexibility in defining custom aggregation functions.
    • Memory usage and performance can be controlled based on aggregation functions.
  4. combineByKey:
    • Provides the most flexibility in custom aggregation logic.
    • Allows specifying functions for creating an initial value, merging values, and merging values across partitions.
    • Offers control over memory usage and performance by defining appropriate aggregation functions.
    • Requires more implementation effort compared to other operations.

Overall, the choice depends on your specific requirements. reduceByKey is generally preferred due to its efficient data shuffling and reduced memory usage. aggregateByKey and combineByKey offer more flexibility in custom aggregations but may require additional implementation effort. groupByKey should be used with caution due to potential memory issues and performance challenges, especially with large datasets.

Consider factors such as data shuffling, memory usage, flexibility in aggregation logic, and performance characteristics to select the most suitable operation for your use case.


// Import
import org.apache.spark.sql.SparkSession

// Create SparkSession
val spark = SparkSession.builder()
  .appName("SparkByExamplesDemo")
  .master("local")
  .getOrCreate()

//Create and RDD
val salesData = spark.sparkContext.parallelize(Seq(
  ("product1", 100.0),
  ("product2", 50.0),
  ("product1", 75.0),
  ("product3", 200.0),
  ("product2", 125.0),
  ("product1", 50.0),
  ("product3", 150.0)
))

//reduceByKey
val totalSales = salesData.reduceByKey(_ + _)
totalSales.collect().foreach(println)

//groupByKey
val groupedSales = salesData.groupByKey()
groupedSales.collect().foreach(println)

//aggregateByKey
val initialSalesCount = (0.0, 0)
val aggregateSales = salesData.aggregateByKey(initialSalesCount)(
  (salesCount, saleAmount) => (salesCount._1 + saleAmount, salesCount._2 + 1),
  (salesCount1, salesCount2) => (salesCount1._1 + salesCount2._1, salesCount1._2 + salesCount2._2)
)
val averageSales = aggregateSales.mapValues(salesCount => salesCount._1 / salesCount._2)
aggregateSales.collect().foreach(println)
averageSales.collect().foreach(println)

//combineByKey
val salesStatsByCategory = electronicData.combineByKey(
  salesAmount => (salesAmount, 1),
  (acc: (Int, Int), salesAmount) => (acc._1 + salesAmount, acc._2 + 1),
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).mapValues { case (totalSales, count) => (totalSales, totalSales / count.toFloat) }

salesStatsByCategory.collect().foreach(println)

Related Articles