PySpark RDD Tutorial | Learn with Examples

This PySpark RDD Tutorial will help you understand what is RDD (Resilient Distributed Dataset)?, It’s advantages, how to create, and using it with Github examples. All RDD examples provided in this Tutorial were tested in our development environment and are available at GitHub PySpark examples project for quick reference.

By the end of this PySpark tutorial, you will learn What is PySpark RDD? It’s advantages, limitations, creating an RDD, applying transformations, actions, and operating on pair RDD.

What is RDD (Resilient Distributed Dataset)?

RDD (Resilient Distributed Dataset) is a fundamental building block of PySpark which is fault-tolerant, immutable distributed collections of objects. Immutable meaning once you create an RDD you cannot change it. Each record in RDD is divided into logical partitions, which can be computed on different nodes of the cluster. 

In other words, RDDs are a collection of objects similar to list in Python, with the difference being RDD is computed on several processes scattered across multiple physical servers also called nodes in a cluster while a Python collection lives and process in just one process.

Additionally, RDDs provide data abstraction of partitioning and distribution of the data designed to run computations in parallel on several nodes, while doing transformations on RDD we don’t have to worry about the parallelism as PySpark by default provides.

This Apache PySpark RDD tutorial describes the basic operations available on RDDs, such as map(), filter(), and persist() and many more. In addition, this tutorial also explains Pair RDD functions that operate on RDDs of key-value pairs such as groupByKey() and join() etc.

Note: RDD’s can have a name and unique identifier (id)

PySpark RDD Benefits

PySpark is widely adapted in Machine learning and Data science community due to it’s advantages compared with traditional python programming.

In-Memory Processing

PySpark loads the data from disk and process in memory and keeps the data in memory, this is the main difference between PySpark and Mapreduce (I/O intensive). In between the transformations, we can also cache/persists the RDD in memory to reuse the previous computations.

Immutability

PySpark RDD’s are immutable in nature meaning, once RDDs are created you cannot modify. When we apply transformations on RDD, PySpark creates a new RDD and maintains the RDD Lineage.

Fault Tolerance

PySpark operates on fault-tolerant data stores on HDFS, S3 e.t.c hence any RDD operation fails, it automatically reloads the data from other partitions. Also, When PySpark applications running on a cluster, PySpark task failures are automatically recovered for a certain number of times (as per the configuration) and finish the application seamlessly.

Lazy Evolution

PySpark does not evaluate the RDD transformations as they appear/encountered by Driver instead it keeps the all transformations as it encounters(DAG) and evaluates the all transformation when it sees the first RDD action.

Partitioning

When you create RDD from a data, It by default partitions the elements in a RDD. By default it partitions to the number of cores available.

PySpark RDD Limitations

PySpark RDDs are not much suitable for applications that make updates to the state store such as storage systems for a web application. For these applications, it is more efficient to use systems that perform traditional update logging and data checkpointing, such as databases. The goal of RDD is to provide an efficient programming model for batch analytics and leave these asynchronous applications.

Creating RDD

RDD’s are created primarily in two different ways,

Before we look into examples, first let’s initialize SparkSession using the builder pattern method defined in SparkSession class. While initializing, we need to provide the master and application name as shown below. In realtime application, you will pass master from spark-submit instead of hardcoding on Spark application.


from pyspark.sql import SparkSession
spark:SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()    

master() – If you are running it on the cluster you need to use your master name as an argument to master(). usually, it would be either <a href="https://sparkbyexamples.com/hadoop/how-yarn-works/">yarn (Yet Another Resource Negotiator)</a> or mesos depends on your cluster setup.

  • Use local[x] when running in Standalone mode. x should be an integer value and should be greater than 0; this represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.

appName() – Used to set your application name.

getOrCreate() – This returns a SparkSession object if already exists, creates new one if not exists.

Note: Creating SparkSession object, it internally creates one SparkContext per JVM.

Create RDD using sparkContext.parallelize()

By using parallelize() function of SparkContext (sparkContext.parallelize() ) you can create an RDD. This function loads the existing collection from your driver program into parallelizing RDD. This is a basic method to create RDD and used when you already have data in memory that either loaded from a file or from a database. and it required all data to be present on the driver program prior to creating RDD.

Pyspark rdd tutorial
RDD from list

#Create RDD from parallelize    
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd=spark.sparkContext.parallelize(data)

For production applications, we mostly create RDD by using external storage systems like HDFS, S3, HBase e.t.c. To make it simple for this PySpark RDD tutorial we are using files from the local system or loading it from the python list to create RDD.

Create RDD using sparkContext.textFile()

Using textFile() method we can read a text (.txt) file into RDD.


#Create RDD from external Data source
rdd2 = spark.sparkContext.textFile("/path/textFile.txt")

Create RDD using sparkContext.wholeTextFiles()

wholeTextFiles() function returns a PairRDD with the key being the file path and value being file content.


#Reads entire file into a RDD as single record.
rdd3 = spark.sparkContext.wholeTextFiles("/path/textFile.txt")

Besides using text files, we can also create RDD from CSV file, JSON, and more formats.

Create empty RDD using sparkContext.emptyRDD

Using emptyRDD() method on sparkContext we can create an RDD with no data. This method creates an empty RDD with no partition.


# Creates empty RDD with no partition    
rdd = spark.sparkContext.emptyRDD 
# rddString = spark.sparkContext.emptyRDD[String]

Creating empty RDD with partition

Some times we may need to write an empty RDD to files by partition, In this case, you should create an empty RDD with partition.


#Create empty RDD with partition
rdd2 = spark.sparkContext.parallelize([],10) #This creates 10 partitions

RDD Parallelize

When we use parallelize() or textFile() or wholeTextFiles() methods of SparkContxt to initiate RDD, it automatically splits the data into partitions based on resource availability. when you run it on a laptop it would create partitions as the same number of cores available on your system.

getNumPartitions() – This a RDD function which returns a number of partitions our dataset split into.


print("initial partition count:"+str(rdd.getNumPartitions()))
#Outputs: initial partition count:2

Set parallelize manually – We can also set a number of partitions manually, all, we need is, to pass a number of partitions as the second parameter to these functions for example  sparkContext.parallelize([1,2,3,4,56,7,8,9,12,3], 10)

Repartition and Coalesce

Some times we may need to repartition the RDD, PySpark provides two ways to repartition; first using repartition() method which shuffles data from all nodes also called full shuffle and second coalesce() method which shuffle data from minimum nodes, for examples if you have data in 4 partitions and doing coalesce(2) moves data from just 2 nodes.  

Both of the functions take the number of partitions to repartition rdd as shown below.  Note that <a href="https://sparkbyexamples.com/pyspark/pyspark-repartition-vs-coalesce/">repartition()</a> method is a very expensive operation as it shuffles data from all nodes in a cluster. 


reparRdd = rdd.repartition(4)
print("re-partition count:"+str(reparRdd.getNumPartitions()))
#Outputs: "re-partition count:4

Note: repartition() or coalesce() methods also returns a new RDD.

PySpark RDD Operations

RDD transformations – Transformations are lazy operations, instead of updating an RDD, these operations return another RDD.
RDD actions – operations that trigger computation and return RDD values.

RDD Transformations with example

Transformations on PySpark RDD returns another RDD and transformations are lazy meaning they don’t execute until you call an action on RDD. Some transformations on RDD’s are flatMap(), map(), reduceByKey(), filter(), sortByKey() and return new RDD instead of updating the current.

In this PySpark RDD Transformation section of the tutorial, I will explain transformations using the word count example. The below image demonstrates different RDD transformations we going to use.

First, create an RDD by reading a text file. The text file used here is available at the GitHub project.


rdd = spark.sparkContext.textFile("/tmp/test.txt")

flatMap – flatMap() transformation flattens the RDD after applying the function and returns a new RDD. On the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.


rdd2 = rdd.flatMap(lambda x: x.split(" "))

map – map() transformation is used the apply any complex operations like adding a column, updating a column e.t.c, the output of map transformations would always have the same number of records as input.

In our word count example, we are adding a new column with value 1 for each word, the result of the RDD is PairRDDFunctions which contains key-value pairs, word of type String as Key and 1 of type Int as value.


rdd3 = rdd2.map(lambda x: (x,1))

reduceByKey – reduceByKey() merges the values for each key with the function specified. In our example, it reduces the word string by applying the sum function on value. The result of our RDD contains unique words and their count. 


rdd5 = rdd4.reduceByKey(lambda a,b: a+b)

sortByKey – sortByKey() transformation is used to sort RDD elements on key. In our example, first, we convert RDD[(String,Int]) to RDD[(Int, String]) using map transformation and apply sortByKey which ideally does sort on an integer value. And finally, foreach with println statements returns all words in RDD and their count as key-value pair


rdd6 = rdd5.map(lambda x: (x[1],x[0])).sortByKey()
#Print rdd6 result to console
print(rdd6.collect())

filter – filter() transformation is used to filter the records in an RDD. In our example we are filtering all words starts with “a”.


rdd4 = rdd3.filter(lambda x : 'an' in x[1])
print(rdd4.collect())

Please refer to this page for the full list of RDD transformations.

RDD Actions with example

RDD Action operations return the values from an RDD to a driver program. In other words, any RDD function that returns non-RDD is considered as an action. 

In this section of the PySpark RDD tutorial, we will continue to use our word count example and performs some actions on it.

count() – Returns the number of records in an RDD


# Action - count
print("Count : "+str(rdd6.count()))

first() – Returns the first record.


# Action - first
firstRec = rdd6.first()
print("First Record : "+str(firstRec[0]) + ","+ firstRec[1])

max() – Returns max record.


# Action - max
datMax = rdd6.max()
print("Max Record : "+str(datMax[0]) + ","+ datMax[1])

reduce() – Reduces the records to single, we can use this to count or sum.


# Action - reduce
totalWordCount = rdd6.reduce(lambda a,b: (a[0]+b[0],a[1]))
print("dataReduce Record : "+str(totalWordCount[0]))

take() – Returns the record specified as an argument.


# Action - take
data3 = rdd6.take(3)
for f in data3:
    print("data3 Key:"+ str(f[0]) +", Value:"+f[1])

collect() – Returns all data from RDD as an array. Be careful when you use this action when you are working with huge RDD with millions and billions of data as you may run out of memory on the driver.


# Action - collect
data = rdd6.collect()
for f in data:
    print("Key:"+ str(f[0]) +", Value:"+f[1])

saveAsTextFile() – Using saveAsTestFile action, we can write the RDD to a text file.


rdd6.saveAsTextFile("/tmp/wordCount")

Note: Please refer to this page for a full list of RDD actions.

Types of RDD

PairRDDFunctions or PairRDD – Pair RDD is a key-value pair This is mostly used RDD type, 

ShuffledRDD – 

DoubleRDD – 

SequenceFileRDD – 

HadoopRDD – 

ParallelCollectionRDD – 

Shuffle Operations

Shuffling is a mechanism PySpark uses to redistribute the data across different executors and even across machines. PySpark shuffling triggers when we perform certain transformation operations like gropByKey(), reduceByKey(), join() on RDDS

PySpark Shuffle is an expensive operation since it involves the following

  • Disk I/O
  • Involves data serialization and deserialization
  • Network I/O

When creating an RDD, PySpark doesn’t necessarily store the data for all keys in a partition since at the time of creation there is no way we can set the key for data set.

Hence, when we run the reduceByKey() operation to aggregate the data on keys, PySpark does the following. needs to first run tasks to collect all the data from all partitions and

For example, when we perform reduceByKey() operation, PySpark does the following

  • PySpark first runs map tasks on all partitions which groups all values for a single key.
  • The results of the map tasks are kept in memory.
  • When results do not fit in memory, PySpark stores the data into a disk.
  • PySpark shuffles the mapped data across partitions, some times it also stores the shuffled data into a disk for reuse when it needs to recalculate.
  • Run the garbage collection
  • Finally runs reduce tasks on each partition based on key.

PySpark RDD triggers shuffle and repartition for several operations like repartition() and coalesce(),  groupByKey(),  reduceByKey(), cogroup() and join() but not countByKey() .

Shuffle partition size & Performance

Based on your dataset size, a number of cores and memory PySpark shuffling can benefit or harm your jobs. When you dealing with less amount of data, you should typically reduce the shuffle partitions otherwise you will end up with many partitioned files with less number of records in each partition. which results in running many tasks with lesser data to process.

On other hand, when you have too much of data and having less number of partitions results in fewer longer running tasks and some times you may also get out of memory error.

Getting the right size of the shuffle partition is always tricky and takes many runs with different values to achieve the optimized number. This is one of the key properties to look for when you have performance issues on PySpark jobs.

PySpark RDD Persistence Tutorial

PySpark Cache and Persist are optimization techniques to improve the performance of the RDD jobs that are iterative and interactive. In this PySpark RDD Tutorial section, I will explain how to use persist() and cache() methods on RDD with examples.

Though PySpark provides computation 100 x times faster than traditional Map Reduce jobs, If you have not designed the jobs to reuse the repeating computations you will see degrade in performance when you are dealing with billions or trillions of data. Hence, we need to look at the computations and use optimization techniques as one of the ways to improve performance.

Using cache() and persist() methods, PySpark provides an optimization mechanism to store the intermediate computation of an RDD so they can be reused in subsequent actions.

When you persist or cache an RDD, each worker node stores it’s partitioned data in memory or disk and reuses them in other actions on that RDD. And Spark’s persisted data on nodes are fault-tolerant meaning if any partition is lost, it will automatically be recomputed using the original transformations that created it.

Advantages of Persisting RDD

  • Cost efficient – PySpark computations are very expensive hence reusing the computations are used to save cost.
  • Time efficient – Reusing the repeated computations saves lots of time.
  • Execution time – Saves execution time of the job which allows us to perform more jobs on the same cluster.

RDD Cache

PySpark RDD <strong>cache()</strong> method by default saves RDD computation to storage level `<strong>MEMORY_ONLY</strong>` meaning it will store the data in the JVM heap as unserialized objects.

PySpark cache() method in RDD class internally calls persist() method which in turn uses sparkSession.sharedState.cacheManager.cacheQuery to cache the result set of RDD. Let’s look at an example.


 cachedRdd = rdd.cache()

RDD Persist

PySpark persist() method is used to store the RDD to one of the storage levels <strong>MEMORY_ONLY</strong>,<strong>MEMORY_AND_DISK</strong>, <strong>MEMORY_ONLY_SER</strong>, <strong>MEMORY_AND_DISK_SER</strong>, <strong>DISK_ONLY</strong>, <strong>MEMORY_ONLY_2</strong>,<strong>MEMORY_AND_DISK_2</strong> and more.

PySpark persist has two signature first signature doesn’t take any argument which by default saves it to <strong>MEMORY_ONLY</strong> storage level and the second signature which takes StorageLevel as an argument to store it to different storage levels.


import pyspark
dfPersist = rdd.persist(pyspark.StorageLevel.MEMORY_ONLY)
dfPersist.show(false)

RDD Unpersist

PySpark automatically monitors every persist() and cache() calls you make and it checks usage on each node and drops persisted data if not used or by using least-recently-used (LRU) algorithm. You can also manually remove using unpersist() method. unpersist() marks the RDD as non-persistent, and remove all blocks for it from memory and disk.


  rddPersist2 = rddPersist.unpersist()

unpersist(Boolean) with boolean as argument blocks until all blocks are deleted.

Persistence Storage Levels

All different storage level PySpark supports are available at org.apache.spark.storage.StorageLevel class. Storage Level defines how and where to store the RDD.

<strong>MEMORY_ONLY</strong> – This is the default behavior of the RDD cache() method and stores the RDD as deserialized objects to JVM memory. When there is no enough memory available it will not save to RDD of some partitions and these will be re-computed as and when required. This takes more storage but runs faster as it takes few CPU cycles to read from memory.

<strong>MEMORY_ONLY_SER</strong> – This is the same as MEMORY_ONLY but the difference being it stores RDD as serialized objects to JVM memory. It takes lesser memory (space-efficient) then MEMORY_ONLY as it saves objects as serialized and takes an additional few more CPU cycles in order to deserialize.

<strong>MEMORY_ONLY_2</strong> – Same as MEMORY_ONLY storage level but replicate each partition to two cluster nodes.

<strong>MEMORY_ONLY_SER_2</strong> – Same as MEMORY_ONLY_SER storage level but replicate each partition to two cluster nodes.

<strong>MEMORY_AND_DISK</strong> – In this Storage Level, The RDD will be stored in JVM memory as a deserialized objects. When required storage is greater than available memory, it stores some of the excess partitions in to disk and reads the data from disk when it required. It is slower as there is I/O involved.

<strong>MEMORY_AND_DISK_SER</strong> – This is same as MEMORY_AND_DISK storage level difference being it serializes the RDD objects in memory and on disk when space not available.

<strong>MEMORY_AND_DISK_2</strong> – Same as MEMORY_AND_DISK storage level but replicate each partition to two cluster nodes.

<strong>MEMORY_AND_DISK_SER_2</strong> – Same as MEMORY_AND_DISK_SER storage level but replicate each partition to two cluster nodes.

<strong>DISK_ONLY</strong> – In this storage level, RDD is stored only on disk and the CPU computation time is high as I/O involved.

<strong>DISK_ONLY_2</strong> – Same as DISK_ONLY storage level but replicate each partition to two cluster nodes.

PySpark Shared Variables Tutorial

In this section of the PySpark RDD tutorial, let’s learn what are the different types of PySpark Shared variables and how they are used in PySpark transformations.

When PySpark executes transformation using map() or reduce() operations, It executes the transformations on a remote node by using the variables that are shipped with the tasks and these variables are not sent back to PySpark Driver hence there is no capability to reuse and sharing the variables across tasks. PySpark shared variables solve this problem using the below two techniques. PySpark provides two types of shared variables.

  • Broadcast variables (read-only shared variable)
  • Accumulator variables (updatable shared variables)

Broadcast read-only Variables

Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster in-order to access or use by the tasks. Instead of sending this data along with every task, PySpark distributes broadcast variables to the machine using efficient broadcast algorithms to reduce communication costs.

One of the best use-case of PySpark RDD Broadcast is to use with lookup data for example zip code, state, country lookups e.t.c

When you run a PySpark RDD job that has the Broadcast variables defined and used, PySpark does the following.

  • PySpark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.
  • Later Stages are also broken into tasks
  • PySpark broadcasts the common data (reusable) needed by tasks within each stage.
  • The broadcasted data is cache in serialized format and deserialized before executing each task.

The PySpark Broadcast is created using the broadcast(v) method of the SparkContext class. This method takes the argument v that you want to broadcast.


broadcastVar = sc.broadcast([0, 1, 2, 3])
broadcastVar.value

Note that broadcast variables are not sent to executors with sc.broadcast(variable) call instead, they will be sent to executors when they are first used.

Refer to PySpark RDD Broadcast shared variable for more detailed example.

Accumulators

PySpark Accumulators are another type shared variable that are only “added” through an associative and commutative operation and are used to perform counters (Similar to Map-reduce counters) or sum operations.

PySpark by default supports creating an accumulator of any numeric type and provides the capability to add custom accumulator types. Programmers can create following accumulators

  • named accumulators
  • unnamed accumulators

When you create a named accumulator, you can see them on PySpark web UI under the “Accumulator” tab. On this tab, you will see two tables; the first table “accumulable” – consists of all named accumulator variables and their values. And on the second table “Tasks” – value for each accumulator modified by a task.

Where as unnamed accumulators are not shows on PySpark web UI, For all practical purposes it is suggestable to use named accumulators.

Accumulator variables are created using SparkContext.longAccumulator(v)


accum = sc.longAccumulator("SumAccumulator")
sc.parallelize([1, 2, 3]).foreach(lambda x: accum.add(x))

PySpark by default provides accumulator methods for long, double and collection types. All these methods are present in SparkContext class and return LongAccumulator, DoubleAccumulator, and CollectionAccumulator respectively.

Advanced API – DataFrame & DataSet

Creating RDD from DataFrame and vice-versa

Though we have more advanced API’s over RDD, we would often need to convert DataFrame to RDD or RDD to DataFrame. Below are several examples.


# Converts RDD to DataFrame
dfFromRDD1 = rdd.toDF()
# Converts RDD to DataFrame with column names
dfFromRDD2 = rdd.toDF("col1","col2")
# using createDataFrame() - Convert DataFrame to RDD
df = spark.createDataFrame(rdd).toDF("col1","col2")
# Convert DataFrame to RDD
rdd = df.rdd

Where to go from here?

Learn PySpark DataFrame Tutorial with examples

Leave a Reply