Spark Pair RDD Functions

Spark defines PairRDDFunctions class with several functions to work with Pair RDD or RDD key-value pair, In this tutorial, we will learn these functions with Scala examples. Pair RDD’s are come in handy when you need to apply transformations like hash partition, set operations, joins e.t.c.

All these functions are grouped into Transformations and Actions similar to regular RDD’s.

Spark Pair RDD Transformation Functions

Aggregate Function SyntaxAggregate Function Description
approx_count_distinct(e: Column)Returns the count of distinct items in a group.
approx_count_distinct(e: Column, rsd: Double)Returns the count of distinct items in a group.
avg(e: Column)Returns the average of values in the input column.
collect_list(e: Column)Returns all values from an input column with duplicates.
collect_set(e: Column)Returns all values from an input column with duplicate values .eliminated.
corr(column1: Column, column2: Column)Returns the Pearson Correlation Coefficient for two columns.
count(e: Column)Returns number of elements in a column.
countDistinct(expr: Column, exprs: Column*)Returns number of distinct elements in the columns.
covar_pop(column1: Column, column2: Column)Returns the population covariance for two columns.
covar_samp(column1: Column, column2: Column)Returns the sample covariance for two columns.
first(e: Column, ignoreNulls: Boolean)Returns the first element in a column when ignoreNulls is set to true, it returns first non null element.
first(e: Column): ColumnReturns the first element in a column.
grouping(e: Column)Indicates whether a specified column in a GROUP BY list is aggregated or not, returns 1 for aggregated or 0 for not aggregated in the result set.
kurtosis(e: Column)Returns the kurtosis of the values in a group.
last(e: Column, ignoreNulls: Boolean)Returns the last element in a column. when ignoreNulls is set to true, it returns last non null element.
last(e: Column)Returns the last element in a column.
max(e: Column)Returns the maximum value in a column.
mean(e: Column)Alias for Avg. Returns the average of the values in a column.
min(e: Column)Returns the minimum value in a column.
skewness(e: Column)Returns the skewness of the values in a group.
stddev(e: Column)alias for `stddev_samp`.
stddev_samp(e: Column)Returns the sample standard deviation of values in a column.
stddev_pop(e: Column)Returns the population standard deviation of the values in a column.
sum(e: Column)Returns the sum of all values in a column.
sumDistinct(e: Column)Returns the sum of all distinct values in a column.
variance(e: Column)alias for `var_samp`.
var_samp(e: Column)Returns the unbiased variance of the values in a column.
var_pop(e: Column)returns the population variance of the values in a column.

Spark Pair RDD Actions

Window Function SyntaxWindow Function Description
row_number(): ColumnReturns a sequential number starting from 1 within a window partition
rank(): ColumnReturns the rank of rows within a window partition, with gaps.
percent_rank(): ColumnReturns the percentile rank of rows within a window partition.
dense_rank(): ColumnReturns the rank of rows within a window partition without any gaps. Where as Rank() returns rank with gaps.
ntile(n: Int): Column Returns the ntile id in a window partition
cume_dist(): ColumnReturns the cumulative distribution of values within a window partition
lag(e: Column, offset: Int): Column
lag(columnName: String, offset: Int): Column
lag(columnName: String, offset: Int, defaultValue: Any): Column
returns the value that is `offset` rows before the current row, and `null` if there is less than `offset` rows before the current row.
lead(columnName: String, offset: Int): Column
lead(columnName: String, offset: Int): Column
lead(columnName: String, offset: Int, defaultValue: Any): Column
returns the value that is `offset` rows after the current row, and `null` if there is less than `offset` rows after the current row.

Pair RDD Functions Examples

In this section, I will explain Spark pair RDD functions with scala examples, before we get started let’s create a pair RDD.


val spark = SparkSession.builder()
   .appName("SparkByExample")
   .master("local")
   .getOrCreate()
 val rdd = spark.sparkContext.parallelize(
      List("Germany India USA","USA India Russia","India Brazil Canada China")
    )
 val wordsRdd = rdd.flatMap(_.split(" "))
 val pairRDD = wordsRdd.map(f=>(f,1))
 pairRDD.foreach(println)

This snippet creates a pair RDD by splitting by space on every element in an RDD, flatten it to form a single word string on each element in RDD and finally assigns an integer “1” to every word.


(Germany,1)
(India,1)
(USA,1)
(USA,1)
(India,1)
(Russia,1)
(India,1)
(Brazil,1)
(Canada,1)
(China,1)

distinct – Returns distinct keys.


pairRDD.distinct().foreach(println)

//Prints below output
(Germany,1)
(India,1)
(Brazil,1)
(China,1)
(USA,1)
(Canada,1)
(Russia,1)

sortByKey – Transformation returns an RDD after sorting by key


    println("Sort by Key ==>")
    val sortRDD = pairRDD.sortByKey()
    sortRDD.foreach(println)

Yields below output.


Sort by Key ==>
(Brazil,1)
(Canada,1)
(China,1)
(Germany,1)
(India,1)
(India,1)
(India,1)
(Russia,1)
(USA,1)
(USA,1)

reduceByKey – Transformation returns an RDD after adding value for each key.

Result RDD contains unique keys.


    println("Reduce by Key ==>")
    val wordCount = pairRDD.reduceByKey((a,b)=>a+b)
    wordCount.foreach(println)

This reduces the key by summing the values. Yields below output.


Reduce by Key ==>
(Brazil,1)
(Canada,1)
(China,1)
(USA,2)
(Germany,1)
(Russia,1)
(India,3)

aggregateByKey – Transformation same as reduceByKey

In our example, this is similar to reduceByKey but uses a different approach.


    def param1= (accu:Int,v:Int) => accu + v
    def param2= (accu1:Int,accu2:Int) => accu1 + accu2
    println("Aggregate by Key ==> wordcount")
    val wordCount2 = pairRDD.aggregateByKey(0)(param1,param2)
    wordCount2.foreach(println)

This example yields the same output as reduceByKey example.

keys – Return RDD[K] with all keys in an dataset


    println("Keys ==>")
    wordCount2.keys.foreach(println)

Yields below output


Brazil
Canada
China
USA
Germany
Russia
India

values – return RDD[V] with all values in an dataset


    println("Keys ==>")
    wordCount2.keys.foreach(println)

count – This is an action function and returns a count of a dataset


println("Count :"+wordCount2.count())

collectAsMap – This is an action function and returns Map to the master for retrieving all date from a dataset.


    println("collectAsMap ==>")
    pairRDD.collectAsMap().foreach(println)

Yields below output:


(Brazil,1)
(Canada,1)
(Germany,1)
(China,1)
(Russia,1)
(India,1)

Complete Example

This example is also available at GitHub project


package com.sparkbyexamples.spark.rdd

import org.apache.spark.sql.SparkSession

import scala.collection.mutable

object OperationsOnPairRDD {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("SparkByExample")
      .master("local")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val rdd = spark.sparkContext.parallelize(
      List("Germany India USA","USA India Russia","India Brazil Canada China")
    )

    val wordsRdd = rdd.flatMap(_.split(" "))
    val pairRDD = wordsRdd.map(f=>(f,1))
    pairRDD.foreach(println)

    println("Distinct ==>")
    pairRDD.distinct().foreach(println)


    //SortByKey
    println("Sort by Key ==>")
    val sortRDD = pairRDD.sortByKey()
    sortRDD.foreach(println)

    //reduceByKey
    println("Reduce by Key ==>")
    val wordCount = pairRDD.reduceByKey((a,b)=>a+b)
    wordCount.foreach(println)

    def param1= (accu:Int,v:Int) => accu + v
    def param2= (accu1:Int,accu2:Int) => accu1 + accu2
    println("Aggregate by Key ==> wordcount")
    val wordCount2 = pairRDD.aggregateByKey(0)(param1,param2)
    wordCount2.foreach(println)

    //keys
    println("Keys ==>")
    wordCount2.keys.foreach(println)

    //values
    println("values ==>")
    wordCount2.values.foreach(println)

    println("Count :"+wordCount2.count())

    println("collectAsMap ==>")
    pairRDD.collectAsMap().foreach(println)

  }
}

Conclusion:

In this tutorial, you have learned PairRDDFunctions class and Spark Pair RDD transformations & action functions with scala examples.

References:

PairRDDFunctions RDD API

Leave a Reply

This Post Has 3 Comments

  1. Anonymous

    Hi, I am getting the following error while executing this above program :

    Exception in thread “main” org.apache.spark.SparkException: Task not serializable

    Caused by: java.io.NotSerializableException: scala.runtime.LazyRef
    Serialization stack:
    – object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)
    – element of array (index: 2)
    – array (class [Ljava.lang.Object;, size 3)
    – field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    – object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.PairRDDFunctions, functionalInterfaceMethod=scala/Function0.apply:()Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/PairRDDFunctions.$anonfun$aggregateByKey$2:([BLscala/reflect/ClassTag;Lscala/runtime/LazyRef;)Ljava/lang/Object;, instantiatedMethodType=()Ljava/lang/Object;, numCaptured=3])
    – writeReplace data (class: java.lang.invoke.SerializedLambda)
    – object (class org.apache.spark.rdd.PairRDDFunctions$$Lambda$1397/952288009, org.apache.spark.rdd.PairRDDFunctions$$Lambda$1397/[email protected])

    Note: i am using spark 3.0 and scala 2.12

    1. NNK

      May I know how are you running this program? I mean using IntelliJ or Spark submit. Also please provide all the dependencies you are using in pom.xml file or equivalent.

  2. Rahul Singh

    from intellij i think this is the correct way to get the output. first collect the output then print
    println(“sort By Key ==>”)
    val sortrdd=pairrdd.sortByKey()
    sortrdd.collect.foreach(println)

You are currently viewing Spark Pair RDD Functions
Photo by NeONBRAND on Unsplash