Spark RDD Tutorial | Learn with Scala Examples

This Apache Spark RDD Tutorial will help you start understanding and using Spark RDD (Resilient Distributed Dataset) with Scala. All RDD examples provided in this Tutorial were tested in our development environment and are available at GitHub spark scala examples project for quick reference.

By the end of the tutorial, you will learn What is Spark RDD, its advantages, and limitations, how to create an RDD, apply transformations, and actions, and operate on pair RDD using Scala and Pyspark examples.

What is RDD (Resilient Distributed Dataset)?

RDD (Resilient Distributed Dataset) is a fundamental data structure of Spark and it is the primary data abstraction in Apache Spark and the Spark Core. RDDs are fault-tolerant, immutable distributed collections of objects, which means once you create an RDD you cannot change it. Each dataset in RDD is divided into logical partitions, which can be computed on different nodes of the cluster.

In other words, RDDs are a collection of objects similar to collections in Scala, with the difference being RDD is computed on several JVMs scattered across multiple physical servers also called nodes in a cluster while a Scala collection lives on a single JVM.

Additionally, RDDs provide data abstraction of partitioning and distribution of the data which is designed to run computations in parallel on several nodes, while doing transformations on RDD most of the time we don’t have to worry about parallelism as Spark by default provides it.

This Apache Spark RDD tutorial describes the basic operations available on RDDs, such as map,filter, and persist etc using Scala example. In addition, this tutorial also explains Pair RDD functions which operate on RDDs of key-value pairs such as groupByKey and join etc.

RDD Advantages

– In-Memory Processing
– Immutability
– Fault Tolerance
– Lazy Evolution
– Partitioning
– Parallelize

Limitations

Spark RDDs are not suitable for applications that make updates to the state store such as storage systems for a web application. For these applications, it is more efficient to use systems that perform traditional update logging and data checkpointing, such as databases. The goal of RDD is to provide an efficient programming model for batch analytics and leave these asynchronous applications.

An RDDs can be present in only one SparkContext and RDD can have a name and unique identifier (id)

RDD Creation

RDD’s are created primarily in two different ways, first parallelizing an existing collection and secondly referencing a dataset in an external storage system (HDFS, S3 and many more). 

Before we look into examples, first let’s initialize SparkSession using the builder pattern method defined in SparkSession class. While initializing, we need to provide the master and application name as shown below.


// Imports
import org.apache.spark.sql.SparkSession
// Initialize the SparkSession object
val spark:SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()    
RDD creation tutorial

Using sparkContext.parallelize()

sparkContext.parallelize() is used to parallelize an existing collection in your driver program. This is a basic method to create RDD and is used mainly while POC’s or prototyping and it required all data to be present on the driver program prior to creating RDD hence it is not most used for production applications.


// Create RDD from parallelize    
val dataSeq = Seq(("Java", 20000), ("Python", 100000), ("Scala", 3000))   
val rdd=spark.sparkContext.parallelize(dataSeq)

For production applications, we mostly create RDD by using external storage systems like HDFS, S3, HBase e.t.c. To make it simple for this Spark tutorial we are using files from the local system and creating RDD.

Using sparkContext.textFile()

Using textFile() method we can read a text (.txt) file into RDD.


// Create RDD from external Data source
val rdd2 = spark.sparkContext.textFile("/path/textFile.txt")

Using sparkContext.wholeTextFiles()

wholeTextFiles() function returns a PairRDD with the key being the file path and value being file content.


// Reads entire file into a RDD as single record.
val rdd3 = spark.sparkContext.wholeTextFiles("/path/textFile.txt")

Besides using text files, we can also create RDD from CSV file, JSON, and more formats.

Using sparkContext.emptyRDD

Using emptyRDD() method on sparkContext we can create an RDD with no data. This method creates an empty RDD with no partition.


// Creates empty RDD with no partition    
val rdd = spark.sparkContext.emptyRDD // creates EmptyRDD[0]
val rddString = spark.sparkContext.emptyRDD[String] // creates EmptyRDD[1]

Creating empty RDD with partition

Sometimes we may need to write an empty RDD to files by partition, In this case, you should create an empty RDD with partition.


// Create empty RDD with partition
val rdd2 = spark.sparkContext.parallelize(Seq.empty[String])

RDD Parallelize and repartition

When we use parallelize() or textFile() or wholeTextFiles() methods of SparkContext to initiate RDD, it automatically splits the data into partitions based on resource availability. 

getNumPartitions – Returns a number of partitions our dataset split into. Any transformations applied on RDD execute parallelly. Spark will run one task for each partition of the cluster.


println("initial partition count:"+rdd.getNumPartitions)
// Outputs: initial partition count:2

Set parallelize manually – We can also set a number of partitions manually, all, we need is, to pass a number of partitions as the second parameter to these functions for example  sparkContext.parallelize(dataSeq, 10)). 

Repalallize using repartition and coalesce – Sometimes we may need to repartition the RDD, Spark provides two ways to repartition; first using repartition() method which shuffles data from all nodes also called full shuffle and second coalesce() method which shuffles data from minimum nodes, for example, if you have data in 4 partitions and doing coalesce(2) moves data from just 2 nodes.  

Both of the functions take the number of partitions to repartition rdd as shown below.  Note that repartition() method is a very expensive operation as it shuffles data from all nodes in a cluster. 


val reparRdd = rdd.repartition(4)
println("re-partition count:"+reparRdd.getNumPartitions)
// Outputs: "re-partition count:4

Note: repartition() or coalesce() methods also returns a new RDD.

RDD Operations

RDD transformations – Transformations are lazy operations, instead of updating an RDD, these operations return another RDD.
RDD actions – operations that trigger computation and return RDD values.

RDD Transformations with example

Transformations on Spark RDD returns another RDD and transformations are lazy meaning they don’t execute until you call an action on RDD. Some transformations on RDD’s are flatMap, map, reduceByKey, filter, sortByKey and return new RDD instead of updating the current.

In this Spark RDD Transformation tutorial, I will explain transformations using the word count example. The below image demonstrates the different RDD transformations we going to use.

spark rdd transformations word count example
Word count spark RDD transformations

First, create an RDD by reading a text file. The text file used here is available at the GitHub project. And, the scala example I am using in this tutorial is available at GitHub project


val rdd:RDD[String] = spark.sparkContext.textFile("src/main/scala/test.txt")

flatMap – flatMap() transformation flattens the RDD after applying the function and returns a new RDD. In the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.


val rdd2 = rdd.flatMap(f=>f.split(" "))

map – map() transformation is used the apply any complex operations like adding a column, updating a column e.t.c, the output of map transformations would always have the same number of records as the input.

In our word count example, we are adding a new column with a value 1 for each word, the result of the RDD is PairRDDFunctions which contains key-value pairs, word of type String as Key, and 1 of type Int as value. For your understanding, I’ve defined rdd3 variable with type.


val rdd3:RDD[(String,Int)]= rdd2.map(m=>(m,1))

filter – filter() transformation is used to filter the records in an RDD. In our example we are filtering all words start with “a”.


val rdd4 = rdd3.filter(a=> a._1.startsWith("a"))

reduceByKey – reduceByKey() merges the values for each key with the function specified. In our example, it reduces the word string by applying the sum function on value. The result of our RDD contains unique words and their count. 


val rdd5 = rdd4.reduceByKey(_ + _)

sortByKey – sortByKey() transformation is used to sort RDD elements on key. In our example, first, we convert RDD[(String,Int]) to RDD[(Int, String]) using map transformation and apply sortByKey which ideally does sort on an integer value. And finally, foreach with println statements returns all words in RDD and their count as key-value pair


val rdd6 = rdd5.map(a=>(a._2,a._1)).sortByKey()
//Print rdd6 result to console
rdd6.foreach(println)

Last statement foreach on rdd print the count of each word. Please refer to this page for the full list of RDD transformations.

RDD Actions with example

RDD Action operation returns the raw values from an RDD. In other words, any RDD function that returns non RDD[T] is considered as an action. 

In this Spark RDD Action tutorial, we will continue to use our word count example, the last statement foreach() is an action that returns all data from an RDD and prints on a console. let’s see some more action operations on our word count example.

count – Returns the number of records in an RDD


    // Action - count
    println("Count : "+rdd6.count())

first – Returns the first record.


    // Action - first
    val firstRec = rdd6.first()
    println("First Record : "+firstRec._1 + ","+ firstRec._2)

max – Returns max record.


    // Action - max
    val datMax = rdd6.max()
    println("Max Record : "+datMax._1 + ","+ datMax._2)

reduce – Reduces the records to single, we can use this to count or sum.


    // Action - reduce
    val totalWordCount = rdd6.reduce((a,b) => (a._1+b._1,a._2))
    println("dataReduce Record : "+totalWordCount._1)

take – Returns the record specified as an argument.


    // Action - take
    val data3 = rdd6.take(3)
    data3.foreach(f=>{
      println("data3 Key:"+ f._1 +", Value:"+f._2)
    })

collect – Returns all data from RDD as an array. Be careful when you use this action when you are working with huge RDD with millions and billions of data as you may run out of memory on the driver.


    // Action - collect
    val data = rdd6.collect()
    data.foreach(f=>{
      println("Key:"+ f._1 +", Value:"+f._2)
    })

saveAsTextFile – Using saveAsTestFile action, we can write the RDD to a text file.


    rdd6.saveAsTextFile("/tmp/wordCount")

Note: Please refer to this page for a full list of RDD actions.

Types of RDD

PairRDDFunctions or PairRDD – Pair RDD is a key-value pair This is mostly used RDD type, 

ShuffledRDD – 

DoubleRDD – 

SequenceFileRDD – 

HadoopRDD – 

ParallelCollectionRDD – 

Shuffle Operations

Shuffling is a mechanism Spark uses to redistribute the data across different executors and even across machines. Spark shuffling triggers when we perform certain transformation operations like gropByKey(), reduceByKey(), join() on RDDS

Spark Shuffle is an expensive operation since it involves the following

  • Disk I/O
  • Involves data serialization and deserialization
  • Network I/O

When creating an RDD, Spark doesn’t necessarily store the data for all keys in a partition since at the time of creation there is no way we can set the key for data set.

Hence, when we run the reduceByKey() operation to aggregate the data on keys, Spark does the following. needs to first run tasks to collect all the data from all partitions and

For example, when we perform reduceByKey() operation, Spark does the following

  • Spark first runs map tasks on all partitions which groups all values for a single key.
  • The results of the map tasks are kept in memory.
  • When results do not fit in memory, Spark stores the data into a disk.
  • Spark shuffles the mapped data across partitions, some times it also stores the shuffled data into a disk for reuse when it needs to recalculate.
  • Run the garbage collection
  • Finally runs reduce tasks on each partition based on key.

Spark RDD triggers shuffle and repartition for several operations like repartition() and coalesce(),  groupByKey(),  reduceByKey(), cogroup() and join() but not countByKey() .

Shuffle partition size & Performance

Based on your dataset size, a number of cores and memory Spark shuffling can benefit or harm your jobs. When you dealing with less amount of data, you should typically reduce the shuffle partitions otherwise you will end up with many partitioned files with less number of records in each partition. which results in running many tasks with lesser data to process.

On other hand, when you have too much of data and having less number of partitions results in fewer longer running tasks and some times you may also get out of memory error.

Getting the right size of the shuffle partition is always tricky and takes many runs with different values to achieve the optimized number. This is one of the key properties to look for when you have performance issues on Spark jobs.

RDD Persistence Tutorial

Spark Cache and Persist are optimization techniques to improve the performance of the RDD jobs that are iterative and interactive. In this Spark RDD Tutorial section, I will explain how to use persist() and cache() methods on RDD with examples.

Though Spark provides computation 100 x times faster than traditional Map Reduce jobs, If you have not designed the jobs to reuse the repeating computations you will see degrade in performance when you are dealing with billions or trillions of data. Hence, we need to look at the computations and use optimization techniques as one of the ways to improve performance.

Using cache() and persist() methods, Spark provides an optimization mechanism to store the intermediate computation of an RDD so they can be reused in subsequent actions.

When you persist or cache an RDD, each worker node stores it’s partitioned data in memory or disk and reuses them in other actions on that RDD. And Spark’s persisted data on nodes are fault-tolerant meaning if any partition is lost, it will automatically be recomputed using the original transformations that created it.

Advantages of Persisting RDD

  • Cost efficient – Spark computations are very expensive hence reusing the computations are used to save cost.
  • Time efficient – Reusing the repeated computations saves lots of time.
  • Execution time – Saves execution time of the job which allows us to perform more jobs on the same cluster.

RDD Cache

Spark RDD cache() method by default saves RDD computation to storage level `MEMORY_ONLY` meaning it will store the data in the JVM heap as unserialized objects.

Spark cache() method in RDD class internally calls persist() method which in turn uses sparkSession.sharedState.cacheManager.cacheQuery to cache the result set of RDD. Let’s look at an example.


  val rdd = sc.textFile("src/main/resources/zipcodes-noheader.csv")

  val rdd2:RDD[ZipCode] = rdd.map(row=>{
    val strArray = row.split(",")
    ZipCode(strArray(0).toInt,strArray(1),strArray(3),strArray(4))
  })

  val rdd3 = rdd2.cache()

  println(rdd3.count()) 

RDD Persist

Spark persist() method is used to store the RDD to one of the storage levels MEMORY_ONLY,MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2,MEMORY_AND_DISK_2 and more.

Spark persist has two signatures first signature doesn’t take any argument which by default saves it to MEMORY_ONLY storage level and the second signature which takes StorageLevel as an argument to store it to different storage levels.


  val dfPersist = rdd.persist(StorageLevel.MEMORY_ONLY)
  dfPersist.show(false)

RDD Unpersist

Spark automatically monitors every persist() and cache() calls you make and it checks usage on each node and drops persisted data if not used or by using least-recently-used (LRU) algorithm. You can also manually remove using unpersist() method. unpersist() marks the RDD as non-persistent, and removes all blocks for it from memory and disk.


  val rddPersist2 = rddPersist.unpersist()

unpersist(Boolean) with boolean as argument blocks until all blocks are deleted.

Persistence Storage Levels

All different storage level Spark supports are available at org.apache.spark.storage.StorageLevel class. Storage Level defines how and where to store the RDD.

MEMORY_ONLY – This is the default behavior of the RDD cache() method and stores the RDD as deserialized objects to JVM memory. When there is no enough memory available it will not save to RDD of some partitions and these will be re-computed as and when required. This takes more storage but runs faster as it takes few CPU cycles to read from memory.

MEMORY_ONLY_SER – This is the same as MEMORY_ONLY but the difference being it stores RDD as serialized objects to JVM memory. It takes lesser memory (space-efficient) then MEMORY_ONLY as it saves objects as serialized and takes an additional few more CPU cycles in order to deserialize.

MEMORY_ONLY_2 – Same as MEMORY_ONLY storage level but replicate each partition to two cluster nodes.

MEMORY_ONLY_SER_2 – Same as MEMORY_ONLY_SER storage level but replicate each partition to two cluster nodes.

MEMORY_AND_DISK – In this Storage Level, The RDD will be stored in JVM memory as a deserialized objects. When required storage is greater than available memory, it stores some of the excess partitions in to disk and reads the data from disk when it required. It is slower as there is I/O involved.

MEMORY_AND_DISK_SER – This is same as MEMORY_AND_DISK storage level difference being it serializes the RDD objects in memory and on disk when space not available.

MEMORY_AND_DISK_2 – Same as MEMORY_AND_DISK storage level but replicate each partition to two cluster nodes.

MEMORY_AND_DISK_SER_2 – Same as MEMORY_AND_DISK_SER storage level but replicate each partition to two cluster nodes.

DISK_ONLY – In this storage level, RDD is stored only on disk and the CPU computation time is high as I/O involved.

DISK_ONLY_2 – Same as DISK_ONLY storage level but replicate each partition to two cluster nodes.

Spark Shared Variables

In this section of the Spark RDD tutorial, let’s learn what are the different types of Spark Shared variables and how they are used in Spark transformations.

When Spark executes transformation using map() or reduce() operations, It executes the transformations on a remote node by using the variables that are shipped with the tasks and these variables are not sent back to Spark Driver hence there is no capability to reuse and sharing the variables across tasks. Spark shared variables solve this problem using the below two techniques. Spark provides two types of shared variables.

  • Broadcast variables (read-only shared variable)
  • Accumulator variables (updatable shared variables)

Broadcast Variables

Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster in-order to access or use by the tasks. Instead of sending this data along with every task, spark distributes broadcast variables to the machine using efficient broadcast algorithms to reduce communication costs.

One of the best use-case of Spark RDD Broadcast is to use with lookup data for example zip code, state, country lookups e.t.c

When you run a Spark RDD job that has the Broadcast variables defined and used, Spark does the following.

  • Spark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.
  • Later Stages are also broken into tasks
  • Spark broadcasts the common data (reusable) needed by tasks within each stage.
  • The broadcasted data is cache in serialized format and deserialized before executing each task.

The Spark Broadcast is created using the broadcast(v) method of the SparkContext class. This method takes the argument v that you want to broadcast.


scala> val broadcastVar = sc.broadcast(Array(0, 1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(0, 1, 2, 3)

Note that broadcast variables are not sent to executors with sc.broadcast(variable) call instead, they will be sent to executors when they are first used.

Refer to Spark RDD Broadcast shared variable for more detailed example.

Accumulators

Spark Accumulators are another type shared variable that are only “added” through an associative and commutative operation and are used to perform counters (Similar to Map-reduce counters) or sum operations.

Spark by default supports creating an accumulator of any numeric type and provides the capability to add custom accumulator types. Programmers can create the following accumulators

  • named accumulators
  • unnamed accumulators

When you create a named accumulator, you can see them on Spark web UI under the “Accumulator” tab. On this tab, you will see two tables; the first table “accumulable” – consists of all named accumulator variables and their values. And on the second table “Tasks” – value for each accumulator modified by a task.

Where as unnamed accumulators are not shows on Spark web UI, For all practical purposes it is suggestable to use named accumulators.

Accumulator variables are created using SparkContext.longAccumulator(v)


scala> val accum = sc.longAccumulator("SumAccumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(SumAccumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3)).foreach(x => accum.add(x))
-----
-----
scala> accum.value
res2: Long = 6

Spark by default provides accumulator methods for long, double and collection types. All these methods are present in SparkContext class and return LongAccumulator, DoubleAccumulator, and CollectionAccumulator respectively.

Advanced API – DataFrame & DataSet

Creating RDD from DataFrame and vice-versa

Though we have more advanced API’s over RDD, we would often need to convert DataFrame to RDD or RDD to DataFrame. Below are several examples.


//Converts RDD to DataFrame
val dfFromRDD1 = rdd.toDF()
//Converts RDD to DataFrame with column names
val dfFromRDD2 = rdd.toDF("col1","col2")
//using createDataFrame() - Convert DataFrame to RDD
val df = spark.createDataFrame(rdd).toDF("col1","col2")
//Convert RDD to Dataset
val ds = spark.createDataset(rdd)
//Convert DataFrame to RDD
val rdd = df.rdd

Where to go from here?

Learn Spark DataFrames Tutorial with examples
Learn Spark Dataset Tutorial with examples

Leave a Reply