Spark RDD Transformations with examples

RDD Transformations are Spark operations when executed on RDD, it results in a single or multiple new RDD’s. Since RDD are immutable in nature, transformations always create new RDD without updating an existing one hence, this creates an RDD lineage.

RDD Lineage Transformation

RDD Lineage is also known as the RDD operator graph or RDD dependency graph.

In this tutorial, you will learn lazy transformations, types of transformations, a complete list of transformation functions using wordcount example in scala.

RDD Transformations are Lazy

RDD Transformations are lazy operations meaning none of the transformations get executed until you call an action on Spark RDD. Since RDD’s are immutable, any transformations on it result in a new RDD leaving the current one unchanged.

RDD Transformation Types

There are two types are transformations.

Narrow Transformation

Narrow transformations are the result of map() and filter() functions and these compute data that live on a single partition meaning there will not be any data movement between partitions to execute narrow transformations.

rdd narrow transformation

Functions such as map(), mapPartition(), flatMap(), filter(), union() are some examples of narrow transformation

Wide Transformation

Wide transformations are the result of groupByKey() and reduceByKey() functions and these compute data that live on many partitions meaning there will be data movements between partitions to execute wide transformations. Since these shuffles the data, they also called shuffle transformations.

rdd wider transformation

Functions such as groupByKey(), aggregateByKey(), aggregate(), join(), repartition() are some examples of a wide transformations.

Note: When compared to Narrow transformations, wide transformations are expensive operations due to shuffling.

Spark RDD Transformation functions

TRANSFORMATION METHODSMETHOD USAGE AND DESCRIPTION
cache()Caches the RDD
filter()Returns a new RDD after applying filter function on source dataset.
flatMap()Returns flattern map meaning if you have a dataset with array, it converts each elements in a array as a row. In other words it return 0 or more items in output for each element in dataset.
map()Applies transformation function on dataset and returns same number of elements in distributed dataset.
mapPartitions()Similar to map, but executs transformation function on each partition, This gives better performance than map function
mapPartitionsWithIndex()Similar to map Partitions, but also provides func with an integer value representing the index of the partition.
randomSplit()Splits the RDD by the weights specified in the argument. For example rdd.randomSplit(0.7,0.3)
union()Comines elements from source dataset and the argument and returns combined dataset. This is similar to union function in Math set operations.
sample()Returns the sample dataset.
intersection()Returns the dataset which contains elements in both source dataset and an argument
distinct()Returns the dataset by eliminating all duplicated elements.
repartition()Return a dataset with number of partition specified in the argument. This operation reshuffles the RDD randamly, It could either return lesser or more partioned RDD based on the input supplied.
coalesce()Similar to repartition by operates better when we want to the decrease the partitions. Betterment acheives by reshuffling the data from fewer nodes compared with all nodes by repartition.

Spark RDD Transformations with Examples

In this section, I will explain a few RDD Transformations with word count example in scala, before we start first, let’s create an RDD by reading a text file. The text file used here is available at the GitHub and, the scala example is available at GitHub project for reference.


// Create RDD by using textFile
val spark:SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExamples.com")
      .getOrCreate()

val sc = spark.sparkContext

val rdd:RDD[String] = sc.textFile("src/main/scala/test.txt")
spark rdd transformations

flatMap() Transformation

flatMap() transformation flattens the RDD after applying the function and returns a new RDD. On the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.


// Apply flatMap() 
val rdd2 = rdd.flatMap(f=>f.split(" "))

map() Transformation

map() transformation is used the apply any complex operations like adding a column, updating a column e.t.c, the output of map transformations would always have the same number of records as input.

In our word count example, we are adding a new column with value 1 for each word, the result of the RDD is PairRDDFunctions which contains key-value pairs, word of type String as Key and 1 of type Int as value. For your understanding, I’ve defined rdd3 variable with type.


// Apply map()
val rdd3:RDD[(String,Int)]= rdd2.map(m=>(m,1))

filter() Transformation

filter() transformation is used to filter the records in an RDD. In our example we are filtering all words starts with “a”.


// Apply filter()
val rdd4 = rdd3.filter(a=> a._1.startsWith("a"))

reduceByKey() Transformation

reduceByKey() merges the values for each key with the function specified. In our example, it reduces the word string by applying the sum function on value. The result of our RDD contains unique words and their count. 


// Apply reduceByKey()
val rdd5 = rdd3.reduceByKey(_ + _)

sortByKey() Transformation

sortByKey() transformation is used to sort RDD elements on key. In our example, first, we convert RDD[(String,Int]) to RDD[(Int,String]) using map transformation and apply sortByKey which ideally does sort on an integer value. And finally, foreach with println statement prints all words in RDD and their count as key-value pair to console.


// Apply sortByKey()
val rdd6 = rdd5.map(a=>(a._2,a._1)).sortByKey()

// Print rdd6 result to console
rdd6.foreach(println)

Spark RDD Transformations complete example


package com.sparkbyexamples.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object WordCountExample {

  def main(args:Array[String]): Unit = {

    val spark:SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExamples.com")
      .getOrCreate()

    val sc = spark.sparkContext

    val rdd:RDD[String] = sc.textFile("src/main/resources/test.txt")
    println("initial partition count:"+rdd.getNumPartitions)

    val reparRdd = rdd.repartition(4)
    println("re-partition count:"+reparRdd.getNumPartitions)

    //rdd.coalesce(3)

    rdd.collect().foreach(println)

    // rdd flatMap transformation
    val rdd2 = rdd.flatMap(f=>f.split(" "))
    rdd2.foreach(f=>println(f))

    //Create a Tuple by adding 1 to each word
    val rdd3:RDD[(String,Int)]= rdd2.map(m=>(m,1))
    rdd3.foreach(println)

    //Filter transformation
    val rdd4 = rdd3.filter(a=> a._1.startsWith("a"))
    rdd4.foreach(println)

    //ReduceBy transformation
    val rdd5 = rdd3.reduceByKey(_ + _)
    rdd5.foreach(println)

    //Swap word,count and sortByKey transformation
    val rdd6 = rdd5.map(a=>(a._2,a._1)).sortByKey()
    println("Final Result")

    //Action - foreach
    rdd6.foreach(println)

    //Action - count
    println("Count : "+rdd6.count())

    //Action - first
    val firstRec = rdd6.first()
    println("First Record : "+firstRec._1 + ","+ firstRec._2)

    //Action - max
    val datMax = rdd6.max()
    println("Max Record : "+datMax._1 + ","+ datMax._2)

    //Action - reduce
    val totalWordCount = rdd6.reduce((a,b) => (a._1+b._1,a._2))
    println("dataReduce Record : "+totalWordCount._1)
    //Action - take
    val data3 = rdd6.take(3)
    data3.foreach(f=>{
      println("data3 Key:"+ f._1 +", Value:"+f._2)
    })

    //Action - collect
    val data = rdd6.collect()
    data.foreach(f=>{
      println("Key:"+ f._1 +", Value:"+f._2)
    })

    //Action - saveAsTextFile
    rdd5.saveAsTextFile("c:/tmp/wordCount")
    
  }
}

Conclusion

In this Spark RDD Transformations tutorial, you have learned different transformation functions and their usage with scala examples and GitHub project for quick reference.

Happy Learning !!

Naveen (NNK)

Naveen (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply

This Post Has 5 Comments

  1. Cláudia Silva

    Thank you very much for this tutorial <3 I have a question : Wide and Narrow transformattions are found both in RDDs and Structured APIs, right ? (I am talking about python Spark of course)

    1. Naveen (NNK)

      Yes, Both RDD and DataFrames in PySpark/Spark supports wide and Narrow transformations.

  2. Jinfei Zhu

    Hi! I’ve been enjoying sparkByExamples but the typo of writing “wide transformation” as “wider transformation” is confusing here. It will be great if someone can correct this typo and best wishes to this super helpful website.

  3. Anonymous

    many thanks!

  4. anuragdas

    Hi, can you tell me how you have created the left sidebar. With tutorials list. Like SPARK RDD TUTORIAL