PySpark RDD Tutorial | Learn with Examples
This PySpark RDD Tutorial will help you understand what is RDD (Resilient Distributed Dataset)?, It’s advantages, how to create, and using it with Github examples. All RDD examples provided in this Tutorial were tested in our development environment and are available at GitHub PySpark examples project for quick reference.
By the end of this PySpark tutorial, you will learn What is PySpark RDD? It’s advantages, limitations, creating an RDD, applying transformations, actions, and operating on pair RDD.
- What is PySpark RDD?
- PySpark RDD Benefits
- PySpark RDD Limitations
- Creating RDD
- RDD Parallelize
- Repartition and Coalesce
- RDD Operations
- RDD Types
- Shuffle Operations
- RDD Persistence
- PySpark Shared Variables
- Advanced API – DataFrame & DataSet
What is RDD (Resilient Distributed Dataset)?
RDD (Resilient Distributed Dataset) is a fundamental building block of PySpark which is fault-tolerant, immutable distributed collections of objects. Immutable meaning once you create an RDD you cannot change it. Each record in RDD is divided into logical partitions, which can be computed on different nodes of the cluster.
In other words, RDDs are a collection of objects similar to list in Python, with the difference being RDD is computed on several processes scattered across multiple physical servers also called nodes in a cluster while a Python collection lives and process in just one process.
Additionally, RDDs provide data abstraction of partitioning and distribution of the data designed to run computations in parallel on several nodes, while doing transformations on RDD we don’t have to worry about the parallelism as PySpark by default provides.
This Apache PySpark RDD tutorial describes the basic operations available on RDDs, such as
persist() and many more. In addition, this tutorial also explains Pair RDD functions that operate on RDDs of key-value pairs such as
Note: RDD’s can have a name and unique identifier (id)
PySpark RDD Benefits
PySpark is widely adapted in Machine learning and Data science community due to it’s advantages compared with traditional python programming.
PySpark loads the data from disk and process in memory and keeps the data in memory, this is the main difference between PySpark and Mapreduce (I/O intensive). In between the transformations, we can also cache/persists the RDD in memory to reuse the previous computations.
PySpark RDD’s are immutable in nature meaning, once RDDs are created you cannot modify. When we apply transformations on RDD, PySpark creates a new RDD and maintains the RDD Lineage.
PySpark operates on fault-tolerant data stores on HDFS, S3 e.t.c hence any RDD operation fails, it automatically reloads the data from other partitions. Also, When PySpark applications running on a cluster, PySpark task failures are automatically recovered for a certain number of times (as per the configuration) and finish the application seamlessly.
PySpark does not evaluate the RDD transformations as they appear/encountered by Driver instead it keeps the all transformations as it encounters(DAG) and evaluates the all transformation when it sees the first RDD action.
When you create RDD from a data, It by default partitions the elements in a RDD. By default it partitions to the number of cores available.
PySpark RDD Limitations
PySpark RDDs are not much suitable for applications that make updates to the state store such as storage systems for a web application. For these applications, it is more efficient to use systems that perform traditional update logging and data checkpointing, such as databases. The goal of RDD is to provide an efficient programming model for batch analytics and leave these asynchronous applications.
RDD’s are created primarily in two different ways,
- parallelizing an existing collection and
- referencing a dataset in an external storage system (
S3and many more).
Before we look into examples, first let’s initialize SparkSession using the builder pattern method defined in SparkSession class. While initializing, we need to provide the master and application name as shown below. In realtime application, you will pass master from spark-submit instead of hardcoding on Spark application.
from pyspark.sql import SparkSession spark:SparkSession = SparkSession.builder() .master("local") .appName("SparkByExamples.com") .getOrCreate()
master() – If you are running it on the cluster you need to use your master name as an argument to master(). usually, it would be either
yarn (Yet Another Resource Negotiator) or
mesos depends on your cluster setup.
local[x]when running in Standalone mode. x should be an integer value and should be greater than 0; this represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.
appName() – Used to set your application name.
getOrCreate() – This returns a SparkSession object if already exists, creates new one if not exists.
Create RDD using sparkContext.parallelize()
parallelize() function of SparkContext (sparkContext.parallelize() ) you can create an RDD. This function loads the existing collection from your driver program into parallelizing RDD. This is a basic method to create RDD and used when you already have data in memory that either loaded from a file or from a database. and it required all data to be present on the driver program prior to creating RDD.
#Create RDD from parallelize data = [1,2,3,4,5,6,7,8,9,10,11,12] rdd=spark.sparkContext.parallelize(data)
For production applications, we mostly create RDD by using external storage systems like
HBase e.t.c. To make it simple for this PySpark RDD tutorial we are using files from the local system or loading it from the python list to create RDD.
Create RDD using sparkContext.textFile()
Using textFile() method we can read a text (.txt) file into RDD.
#Create RDD from external Data source rdd2 = spark.sparkContext.textFile("/path/textFile.txt")
Create RDD using sparkContext.wholeTextFiles()
#Reads entire file into a RDD as single record. rdd3 = spark.sparkContext.wholeTextFiles("/path/textFile.txt")
Besides using text files, we can also create RDD from CSV file, JSON, and more formats.
Create empty RDD using sparkContext.emptyRDD
emptyRDD() method on sparkContext we can create an RDD with no data. This method creates an empty RDD with no partition.
# Creates empty RDD with no partition rdd = spark.sparkContext.emptyRDD # rddString = spark.sparkContext.emptyRDD[String]
Creating empty RDD with partition
Some times we may need to write an empty RDD to files by partition, In this case, you should create an empty RDD with partition.
#Create empty RDD with partition rdd2 = spark.sparkContext.parallelize(,10) #This creates 10 partitions
When we use
wholeTextFiles() methods of SparkContxt to initiate RDD, it automatically splits the data into partitions based on resource availability. when you run it on a laptop it would create partitions as the same number of cores available on your system.
getNumPartitions() – This a RDD function which returns a number of partitions our dataset split into.
print("initial partition count:"+str(rdd.getNumPartitions())) #Outputs: initial partition count:2
Set parallelize manually – We can also set a number of partitions manually, all, we need is, to pass a number of partitions as the second parameter to these functions for example
Repartition and Coalesce
Some times we may need to repartition the RDD, PySpark provides two ways to repartition; first using
repartition() method which shuffles data from all nodes also called full shuffle and second coalesce() method which shuffle data from minimum nodes, for examples if you have data in 4 partitions and doing
coalesce(2) moves data from just 2 nodes.
Both of the functions take the number of partitions to repartition rdd as shown below. Note that
repartition() method is a very expensive operation as it shuffles data from all nodes in a cluster.
reparRdd = rdd.repartition(4) print("re-partition count:"+str(reparRdd.getNumPartitions())) #Outputs: "re-partition count:4
Note: repartition() or coalesce() methods also returns a new RDD.
PySpark RDD Operations
RDD transformations – Transformations are lazy operations, instead of updating an RDD, these operations return another RDD.
RDD actions – operations that trigger computation and return RDD values.
RDD Transformations with example
Transformations on PySpark RDD returns another RDD and transformations are lazy meaning they don’t execute until you call an action on RDD. Some transformations on RDD’s are
sortByKey() and return new RDD instead of updating the current.
In this PySpark RDD Transformation section of the tutorial, I will explain transformations using the word count example. The below image demonstrates different RDD transformations we going to use.
First, create an RDD by reading a text file. The text file used here is available at the GitHub project.
rdd = spark.sparkContext.textFile("/tmp/test.txt")
flatMap() transformation flattens the RDD after applying the function and returns a new RDD. On the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.
rdd2 = rdd.flatMap(lambda x: x.split(" "))
map() transformation is used the apply any complex operations like adding a column, updating a column e.t.c, the output of map transformations would always have the same number of records as input.
In our word count example, we are adding a new column with value 1 for each word, the result of the RDD is
PairRDDFunctions which contains key-value pairs, word of type String as Key and 1 of type Int as value.
rdd3 = rdd2.map(lambda x: (x,1))
reduceByKey() merges the values for each key with the function specified. In our example, it reduces the word string by applying the sum function on value. The result of our RDD contains unique words and their count.
rdd5 = rdd4.reduceByKey(lambda a,b: a+b)
sortByKey() transformation is used to sort RDD elements on key. In our example, first, we convert RDD[(String,Int]) to RDD[(Int, String]) using map transformation and apply sortByKey which ideally does sort on an integer value. And finally, foreach with println statements returns all words in RDD and their count as key-value pair
rdd6 = rdd5.map(lambda x: (x,x)).sortByKey() #Print rdd6 result to console print(rdd6.collect())
filter() transformation is used to filter the records in an RDD. In our example we are filtering all words starts with “a”.
rdd4 = rdd3.filter(lambda x : 'an' in x) print(rdd4.collect())
Please refer to this page for the full list of RDD transformations.
RDD Actions with example
RDD Action operations return the values from an RDD to a driver program. In other words, any RDD function that returns non-RDD is considered as an action.
In this section of the PySpark RDD tutorial, we will continue to use our word count example and performs some actions on it.
count() – Returns the number of records in an RDD
# Action - count print("Count : "+str(rdd6.count()))
first() – Returns the first record.
# Action - first firstRec = rdd6.first() print("First Record : "+str(firstRec) + ","+ firstRec)
max() – Returns max record.
# Action - max datMax = rdd6.max() print("Max Record : "+str(datMax) + ","+ datMax)
reduce() – Reduces the records to single, we can use this to count or sum.
# Action - reduce totalWordCount = rdd6.reduce(lambda a,b: (a+b,a)) print("dataReduce Record : "+str(totalWordCount))
take() – Returns the record specified as an argument.
# Action - take data3 = rdd6.take(3) for f in data3: print("data3 Key:"+ str(f) +", Value:"+f)
collect() – Returns all data from RDD as an array. Be careful when you use this action when you are working with huge RDD with millions and billions of data as you may run out of memory on the driver.
# Action - collect data = rdd6.collect() for f in data: print("Key:"+ str(f) +", Value:"+f)
saveAsTextFile() – Using saveAsTestFile action, we can write the RDD to a text file.
Note: Please refer to this page for a full list of RDD actions.
Types of RDD
PairRDDFunctions or PairRDD – Pair RDD is a key-value pair This is mostly used RDD type,
Shuffling is a mechanism PySpark uses to redistribute the data across different executors and even across machines. PySpark shuffling triggers when we perform certain transformation operations like
join() on RDDS
PySpark Shuffle is an expensive operation since it involves the following
- Disk I/O
- Involves data serialization and deserialization
- Network I/O
When creating an RDD, PySpark doesn’t necessarily store the data for all keys in a partition since at the time of creation there is no way we can set the key for data set.
Hence, when we run the
reduceByKey() operation to aggregate the data on keys, PySpark does the following. needs to first run tasks to collect all the data from all partitions and
For example, when we perform
reduceByKey() operation, PySpark does the following
- PySpark first runs map tasks on all partitions which groups all values for a single key.
- The results of the map tasks are kept in memory.
- When results do not fit in memory, PySpark stores the data into a disk.
- PySpark shuffles the mapped data across partitions, some times it also stores the shuffled data into a disk for reuse when it needs to recalculate.
- Run the garbage collection
- Finally runs reduce tasks on each partition based on key.
PySpark RDD triggers shuffle and repartition for several operations like
join() but not
Shuffle partition size & Performance
Based on your dataset size, a number of cores and memory PySpark shuffling can benefit or harm your jobs. When you dealing with less amount of data, you should typically reduce the shuffle partitions otherwise you will end up with many partitioned files with less number of records in each partition. which results in running many tasks with lesser data to process.
On other hand, when you have too much of data and having less number of partitions results in fewer longer running tasks and some times you may also get out of memory error.
Getting the right size of the shuffle partition is always tricky and takes many runs with different values to achieve the optimized number. This is one of the key properties to look for when you have performance issues on PySpark jobs.
PySpark RDD Persistence Tutorial
PySpark Cache and Persist are optimization techniques to improve the performance of the RDD jobs that are iterative and interactive. In this PySpark RDD Tutorial section, I will explain how to use persist() and cache() methods on RDD with examples.
Though PySpark provides computation 100 x times faster than traditional Map Reduce jobs, If you have not designed the jobs to reuse the repeating computations you will see degrade in performance when you are dealing with billions or trillions of data. Hence, we need to look at the computations and use optimization techniques as one of the ways to improve performance.
persist() methods, PySpark provides an optimization mechanism to store the intermediate computation of an RDD so they can be reused in subsequent actions.
When you persist or cache an RDD, each worker node stores it’s partitioned data in memory or disk and reuses them in other actions on that RDD. And Spark’s persisted data on nodes are fault-tolerant meaning if any partition is lost, it will automatically be recomputed using the original transformations that created it.
Advantages of Persisting RDD
- Cost efficient – PySpark computations are very expensive hence reusing the computations are used to save cost.
- Time efficient – Reusing the repeated computations saves lots of time.
- Execution time – Saves execution time of the job which allows us to perform more jobs on the same cluster.
cache() method by default saves RDD computation to storage level `
MEMORY_ONLY` meaning it will store the data in the JVM heap as unserialized objects.
cache() method in RDD class internally calls
persist() method which in turn uses
sparkSession.sharedState.cacheManager.cacheQuery to cache the result set of RDD. Let’s look at an example.
cachedRdd = rdd.cache()
PySpark persist() method is used to store the RDD to one of the storage levels
MEMORY_AND_DISK_2 and more.
PySpark persist has two signature first signature doesn’t take any argument which by default saves it to
MEMORY_ONLY storage level and the second signature which takes
StorageLevel as an argument to store it to different storage levels.
import pyspark dfPersist = rdd.persist(pyspark.StorageLevel.MEMORY_ONLY) dfPersist.show(false)
PySpark automatically monitors every
cache() calls you make and it checks usage on each node and drops persisted data if not used or by using least-recently-used (LRU) algorithm. You can also manually remove using
unpersist() method. unpersist() marks the RDD as non-persistent, and remove all blocks for it from memory and disk.
rddPersist2 = rddPersist.unpersist()
unpersist(Boolean) with boolean as argument blocks until all blocks are deleted.
Persistence Storage Levels
All different storage level PySpark supports are available at
org.apache.spark.storage.StorageLevel class. Storage Level defines how and where to store the RDD.
MEMORY_ONLY – This is the default behavior of the RDD
cache() method and stores the RDD as deserialized objects to JVM memory. When there is no enough memory available it will not save to RDD of some partitions and these will be re-computed as and when required. This takes more storage but runs faster as it takes few CPU cycles to read from memory.
MEMORY_ONLY_SER – This is the same as
MEMORY_ONLY but the difference being it stores RDD as serialized objects to JVM memory. It takes lesser memory (space-efficient) then MEMORY_ONLY as it saves objects as serialized and takes an additional few more CPU cycles in order to deserialize.
MEMORY_ONLY_2 – Same as
MEMORY_ONLY storage level but replicate each partition to two cluster nodes.
MEMORY_ONLY_SER_2 – Same as
MEMORY_ONLY_SER storage level but replicate each partition to two cluster nodes.
MEMORY_AND_DISK – In this Storage Level, The RDD will be stored in JVM memory as a deserialized objects. When required storage is greater than available memory, it stores some of the excess partitions in to disk and reads the data from disk when it required. It is slower as there is I/O involved.
MEMORY_AND_DISK_SER – This is same as
MEMORY_AND_DISK storage level difference being it serializes the RDD objects in memory and on disk when space not available.
MEMORY_AND_DISK_2 – Same as
MEMORY_AND_DISK storage level but replicate each partition to two cluster nodes.
MEMORY_AND_DISK_SER_2 – Same as
MEMORY_AND_DISK_SER storage level but replicate each partition to two cluster nodes.
DISK_ONLY – In this storage level, RDD is stored only on disk and the CPU computation time is high as I/O involved.
DISK_ONLY_2 – Same as
DISK_ONLY storage level but replicate each partition to two cluster nodes.
PySpark Shared Variables Tutorial
In this section of the PySpark RDD tutorial, let’s learn what are the different types of PySpark Shared variables and how they are used in PySpark transformations.
When PySpark executes transformation using
reduce() operations, It executes the transformations on a remote node by using the variables that are shipped with the tasks and these variables are not sent back to PySpark Driver hence there is no capability to reuse and sharing the variables across tasks. PySpark shared variables solve this problem using the below two techniques. PySpark provides two types of shared variables.
- Broadcast variables (read-only shared variable)
- Accumulator variables (updatable shared variables)
Broadcast read-only Variables
Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster in-order to access or use by the tasks. Instead of sending this data along with every task, PySpark distributes broadcast variables to the machine using efficient broadcast algorithms to reduce communication costs.
One of the best use-case of PySpark RDD Broadcast is to use with lookup data for example zip code, state, country lookups e.t.c
When you run a PySpark RDD job that has the Broadcast variables defined and used, PySpark does the following.
- PySpark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.
- Later Stages are also broken into tasks
- PySpark broadcasts the common data (reusable) needed by tasks within each stage.
- The broadcasted data is cache in serialized format and deserialized before executing each task.
The PySpark Broadcast is created using the
broadcast(v) method of the SparkContext class. This method takes the argument v that you want to broadcast.
broadcastVar = sc.broadcast([0, 1, 2, 3]) broadcastVar.value
Note that broadcast variables are not sent to executors with sc.broadcast(variable) call instead, they will be sent to executors when they are first used.
Refer to PySpark RDD Broadcast shared variable for more detailed example.
PySpark Accumulators are another type shared variable that are only “added” through an associative and commutative operation and are used to perform counters (Similar to Map-reduce counters) or sum operations.
PySpark by default supports creating an accumulator of any numeric type and provides the capability to add custom accumulator types. Programmers can create following accumulators
- named accumulators
- unnamed accumulators
When you create a named accumulator, you can see them on PySpark web UI under the “Accumulator” tab. On this tab, you will see two tables; the first table “accumulable” – consists of all named accumulator variables and their values. And on the second table “Tasks” – value for each accumulator modified by a task.
Where as unnamed accumulators are not shows on PySpark web UI, For all practical purposes it is suggestable to use named accumulators.
Accumulator variables are created using
accum = sc.longAccumulator("SumAccumulator") sc.parallelize([1, 2, 3]).foreach(lambda x: accum.add(x))
PySpark by default provides accumulator methods for long, double and collection types. All these methods are present in SparkContext class and return
Advanced API – DataFrame & DataSet
Creating RDD from DataFrame and vice-versa
Though we have more advanced API’s over RDD, we would often need to convert DataFrame to RDD or RDD to DataFrame. Below are several examples.
# Converts RDD to DataFrame dfFromRDD1 = rdd.toDF() # Converts RDD to DataFrame with column names dfFromRDD2 = rdd.toDF("col1","col2") # using createDataFrame() - Convert DataFrame to RDD df = spark.createDataFrame(rdd).toDF("col1","col2") # Convert DataFrame to RDD rdd = df.rdd