You are currently viewing Spark PairRDD Functions
Photo by NeONBRAND on Unsplash

Spark defines PairRDDFunctions class with several functions to work with Pair RDD or RDD key-value pair, In this tutorial, we will learn these functions with Scala examples. Pair RDD’s are come in handy when you need to apply transformations like hash partition, set operations, joins e.t.c.

All these functions are grouped into Transformations and Actions similar to regular RDD’s.

Spark Pair RDD Transformation Functions

PAIR RDD FUNCTIONSFUNCTION DESCRIPTION
aggregateByKeyAggregate the values of each key in a data set. This function can return a different result type then the values in input RDD.
combineByKeyCombines the elements for each key.
combineByKeyWithClassTagCombines the elements for each key.
flatMapValuesIt’s flatten the values of each key with out changing key values and keeps the original RDD partition.
foldByKeyMerges the values of each key.
groupByKeyReturns the grouped RDD by grouping the values of each key.
mapValuesIt applied a map function for each value in a pair RDD with out changing keys.
reduceByKeyReturns a merged RDD by merging the values of each key.
reduceByKeyLocallyReturns a merged RDD by merging the values of each key and final result will be sent to the master.
sampleByKeyReturns the subset of the RDD.
subtractByKeyReturn an RDD with the pairs from this whose keys are not in other.
keysReturns all keys of this RDD as a RDD[T].
valuesReturns an RDD with just values.
partitionByReturns a new RDD after applying specified partitioner.
fullOuterJoinReturn RDD after applying fullOuterJoin on current and parameter RDD
joinReturn RDD after applying join on current and parameter RDD
leftOuterJoinReturn RDD after applying leftOuterJoin on current and parameter RDD
rightOuterJoinReturn RDD after applying rightOuterJoin on current and parameter RDD

Spark Pair RDD Actions

PAIR RDD ACTION FUNCTIONSFUNCTION DESCRIPTION
collectAsMapReturns the pair RDD as a Map to the Spark Master.
countByKeyReturns the count of each key elements. This returns the final result to local Map which is your driver.
countByKeyApproxSame as countByKey but returns the partial result. This takes a timeout as parameter to specify how long this function to run before returning.
lookupReturns a list of values from RDD for a given input key.
reduceByKeyLocallyReturns a merged RDD by merging the values of each key and final result will be sent to the master.
saveAsHadoopDatasetSaves RDD to any hadoop supported file system (HDFS, S3, ElasticSearch, e.t.c), It uses Hadoop JobConf object to save.
saveAsHadoopFileSaves RDD to any hadoop supported file system (HDFS, S3, ElasticSearch, e.t.c), It uses Hadoop OutputFormat class to save.
saveAsNewAPIHadoopDatasetSaves RDD to any hadoop supported file system (HDFS, S3, ElasticSearch, e.t.c) with new Hadoop API, It uses Hadoop Configuration object to save.
saveAsNewAPIHadoopFileSaves RDD to any hadoop supported fule system (HDFS, S3, ElasticSearch, e.t.c), It uses new Hadoop API OutputFormat class to save.

Pair RDD Functions Examples

In this section, I will explain Spark pair RDD functions with scala examples, before we get started let’s create a pair RDD.


// Creating PairRDD 
val spark = SparkSession.builder()
   .appName("SparkByExample")
   .master("local")
   .getOrCreate()
 val rdd = spark.sparkContext.parallelize(
      List("Germany India USA","USA India Russia","India Brazil Canada China")
    )
 val wordsRdd = rdd.flatMap(_.split(" "))
 val pairRDD = wordsRdd.map(f=>(f,1))
 pairRDD.foreach(println)

This snippet creates a pair RDD by splitting by space on every element in an RDD, flatten it to form a single word string on each element in RDD and finally assigns an integer “1” to every word.


// Output:
(Germany,1)
(India,1)
(USA,1)
(USA,1)
(India,1)
(Russia,1)
(India,1)
(Brazil,1)
(Canada,1)
(China,1)

distinct – Returns distinct keys.


// Applying distinct()
pairRDD.distinct().foreach(println)

// Output:
(Germany,1)
(India,1)
(Brazil,1)
(China,1)
(USA,1)
(Canada,1)
(Russia,1)

sortByKey – Transformation returns an RDD after sorting by key


// SortByKey() on pairRDD 
    println("Sort by Key ==>")
    val sortRDD = pairRDD.sortByKey()
    sortRDD.foreach(println)

Yields below output.


// Output:
Sort by Key ==>
(Brazil,1)
(Canada,1)
(China,1)
(Germany,1)
(India,1)
(India,1)
(India,1)
(Russia,1)
(USA,1)
(USA,1)

reduceByKey – Transformation returns an RDD after adding value for each key.

Result RDD contains unique keys.


// reduceByKey() on pairRDD
    println("Reduce by Key ==>")
    val wordCount = pairRDD.reduceByKey((a,b)=>a+b)
    wordCount.foreach(println)

This reduces the key by summing the values. Yields below output.


// Output:
Reduce by Key ==>
(Brazil,1)
(Canada,1)
(China,1)
(USA,2)
(Germany,1)
(Russia,1)
(India,3)

aggregateByKey – Transformation same as reduceByKey

In our example, this is similar to reduceByKey but uses a different approach.


// aggregateByKey() on pairRDD
    def param1= (accu:Int,v:Int) => accu + v
    def param2= (accu1:Int,accu2:Int) => accu1 + accu2
    println("Aggregate by Key ==> wordcount")
    val wordCount2 = pairRDD.aggregateByKey(0)(param1,param2)
    wordCount2.foreach(println)

This example yields the same output as reduceByKey example.

keys – Return RDD[K] with all keys in an dataset


// Returning all keys from pairRDD
    println("Keys ==>")
    wordCount2.keys.foreach(println)

Yields below output


// Output:
Brazil
Canada
China
USA
Germany
Russia
India

values – return RDD[V] with all values in an dataset


// Get all values from prirRDD
    println("Keys ==>")
    wordCount2.keys.foreach(println)

count – This is an action function and returns a count of a dataset


// Count() to return count of a dataset
println("Count :"+wordCount2.count())

collectAsMap – This is an action function and returns Map to the master for retrieving all data from a dataset.


// collectAsMap() to retrieve all data from a dataset
    println("collectAsMap ==>")
    pairRDD.collectAsMap().foreach(println)

Yields below output:


// Output:
(Brazil,1)
(Canada,1)
(Germany,1)
(China,1)
(Russia,1)
(India,1)

Complete Example

This example is also available at GitHub project


package com.sparkbyexamples.spark.rdd

import org.apache.spark.sql.SparkSession

import scala.collection.mutable

object OperationsOnPairRDD {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("SparkByExample")
      .master("local")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val rdd = spark.sparkContext.parallelize(
      List("Germany India USA","USA India Russia","India Brazil Canada China")
    )

    val wordsRdd = rdd.flatMap(_.split(" "))
    val pairRDD = wordsRdd.map(f=>(f,1))
    pairRDD.foreach(println)

    println("Distinct ==>")
    pairRDD.distinct().foreach(println)


    //SortByKey
    println("Sort by Key ==>")
    val sortRDD = pairRDD.sortByKey()
    sortRDD.foreach(println)

    //reduceByKey
    println("Reduce by Key ==>")
    val wordCount = pairRDD.reduceByKey((a,b)=>a+b)
    wordCount.foreach(println)

    def param1= (accu:Int,v:Int) => accu + v
    def param2= (accu1:Int,accu2:Int) => accu1 + accu2
    println("Aggregate by Key ==> wordcount")
    val wordCount2 = pairRDD.aggregateByKey(0)(param1,param2)
    wordCount2.foreach(println)

    //keys
    println("Keys ==>")
    wordCount2.keys.foreach(println)

    //values
    println("values ==>")
    wordCount2.values.foreach(println)

    println("Count :"+wordCount2.count())

    println("collectAsMap ==>")
    pairRDD.collectAsMap().foreach(println)

  }
}

FAQs on Spark pairRDD

What is pairRDD and when to use them?

A PairRDD (Resilient Distributed Dataset) a type of RDD where each element is a pair of key and value. Pair RDDs are commonly used for operations that require data to be grouped or aggregated by keys. They provide a convenient way to work with structured data, especially when dealing with operations like grouping, reducing, and joining.
Here are some key characteristics and common use cases for PairRDDs:

Key-Value Structure: In a Pair RDD, each element is represented as a tuple (key, value), where key and value can be of any data type, including basic types, custom objects, or even other RDDs.
Grouping: PairRDDs are often used for grouping data by keys. For example, you can group data by a specific attribute in a dataset to perform operations on groups of data with the same key.
Aggregations: PairRDDs are useful for performing aggregation operations such as reduceByKey, groupByKey, combineByKey, and foldByKey to summarize data based on keys.
Joins: PairRDDs can be joined with other PairRDDs based on their keys using operations like join, leftOuterJoin, rightOuterJoin, and cogroup.
Transformations: Various transformations can be applied to PairRDDs, such as mapValues, flatMapValues, and filter, which allow you to manipulate the values associated with keys.

How do I create a pairRDD?

PairRDDs can be created by running a map() function that returns key/value pairs. PairRDDs are commonly used in Spark for operations like groupByKey, reduceByKey, join, and other operations that involve key-value pairs.
We can first create an RDD either by using parallalize() or by using an existing RDD, then apply a map() transformation to make a key value pair.

What is the difference between RDD and pairRDD?

RDD is a distributed collection of data that can be processed in parallel across a cluster of machines.
RDD can hold any type of data, including simple values, objects, or more complex data structures. RDD is used for general-purpose distributed data processing and can be transformed and processed using various operations like map, filter, reduce, groupBy, join, etc. RDDs do not have any inherent key-value structure; they are typically used for non-keyed data.
PairRDD is specialized key-value data and is designed for operations that involve keys. PairRDDs are commonly used in Spark when you need to work with structured data that can be organized and processed based on keys, making them suitable for many data processing tasks, especially in the context of data analytics and transformations.

Conclusion:

In this tutorial, you have learned PairRDDFunctions class and Spark PairRDD transformations & action functions with scala examples.

References:

PairRDDFunctions RDD API

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

This Post Has 3 Comments

  1. Rahul Singh

    from intellij i think this is the correct way to get the output. first collect the output then print
    println(“sort By Key ==>”)
    val sortrdd=pairrdd.sortByKey()
    sortrdd.collect.foreach(println)

  2. NNK

    May I know how are you running this program? I mean using IntelliJ or Spark submit. Also please provide all the dependencies you are using in pom.xml file or equivalent.

  3. Anonymous

    Hi, I am getting the following error while executing this above program :

    Exception in thread “main” org.apache.spark.SparkException: Task not serializable

    Caused by: java.io.NotSerializableException: scala.runtime.LazyRef
    Serialization stack:
    – object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)
    – element of array (index: 2)
    – array (class [Ljava.lang.Object;, size 3)
    – field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    – object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.PairRDDFunctions, functionalInterfaceMethod=scala/Function0.apply:()Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/PairRDDFunctions.$anonfun$aggregateByKey$2:([BLscala/reflect/ClassTag;Lscala/runtime/LazyRef;)Ljava/lang/Object;, instantiatedMethodType=()Ljava/lang/Object;, numCaptured=3])
    – writeReplace data (class: java.lang.invoke.SerializedLambda)
    – object (class org.apache.spark.rdd.PairRDDFunctions$$Lambda$1397/952288009, org.apache.spark.rdd.PairRDDFunctions$$Lambda$1397/952288009@5d01ea21)

    Note: i am using spark 3.0 and scala 2.12

Comments are closed.