• Post author:
  • Post category:PySpark
  • Post last modified:March 27, 2024
  • Reading time:6 mins read
You are currently viewing Different Ways to Create PySpark RDD

In PySpark, Resilient Distributed Datasets (RDDs) are the fundamental data structure representing distributed collections of objects. RDDs can be created in various ways. Here are some examples of how to create RDDs in PySpark:

Resilient Distributed Datasets (RDD) is the fundamental data structure of PySpark. RDDs are immutable and fault-tolerant in nature. RDD is just the way of representing a Dataset distributed across multiple nodes in a cluster, which can be operated in parallel. RDDs are called resilient because they can always re-compute an RDD when a node fails.

Since Spark 2.0, you need to create a SparkSession to run any PySpark examples; below is an example of how to create SparkSession.


# Import
from pyspark.sql import SparkSession

# Create spark session
spark = SparkSession \
    .builder \
    .appName("SparkByExamples.com") \
    .getOrCreate()

Create PySpark RDD from List using Parallelize:

RDDs are commonly created through the parallelization of collections, such as taking an existing collection from the driver program (e.g., Scala, Python) and providing it to the SparkContext‘s parallelize() method. This method is used only for testing but not in real-time, as the entire data used to create RDD is available in the driver node, which is not ideal for production.


# Create PySpark RDD from Parallelize
rdd = spark.sparkContext.parallelize([1,2,3,4,5,6])
print(rdd.collect())

# Output:
# [1, 2, 3, 4, 5, 6]

Here,

spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6]) creates an RDD from a Python list. The parallelize method distributes the data across the nodes in the Spark cluster, creating a parallel collection.

rdd.collect() retrieves all the elements of the RDD to the driver program (in this case, the Python script), allowing them to be printed. The collect() action should be used cautiously with large datasets as it brings all the data to the driver, and it may lead to out-of-memory issues for large datasets.

2. Create RDD from a Tuple:

To create an RDD from a tuple in PySpark, use again the parallelize() method with a tuple as its argument. For example: rdd_from_tuple = sc.parallelize([('Java', 20000), ('Python', 10000), ('Scala', 30000)]). This distributes the tuple elements across the Spark cluster for parallel processing.

Sequence of Key-Value Pairs


# Create PySpark RDD from Tuple
data = [("Java", 20000),("Python", 10000),("Scala", 30000)]
rdd = spark.sparkContext.parallelize(data)
print(rdd.collect())

# Output:
# [('Java', 20000), ('Python', 100000), ('Scala', 3000)]

3. From External Data (Text File):

Mostly in real-time applications, we create RDDs from files. Here, you will see how to create an RDD by reading data from a file by using the textFile() function.


# Create RDD from Text file
rddFile = spark.sparkContext.textFile("path/to/textfile.txt")

If you want to read the entire content of a file as a single record, use the wholeTextFiles() method on sparkContext.

4. Using range Function:

range() is most used in Python; hence, let’s see how to use this to create RDD.


# Create RDD from range function
rddRange = spark.sparkContext.parallelize(range(1, 6))

5. From Existing RDD:

Another most used way to create an RDD is from an existing RDD. You can use transformations like map, flatmap, and filter() to create a new RDD from an existing one.


# Create RDD from anotehr RDD
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5]) 
new_rdd = rdd.map(lambda x: x * 2)

Here,

The code new_rdd = rdd.map(lambda x: x * 2) creates a new RDD (new_rdd) by applying a transformation using the map operation on an existing RDD (rdd). The lambda function lambda x: x * 2 is applied to each element x in rdd, doubling each value in the resulting new_rdd.

7. From JSON Data:

If you have a JSON file, you can also use the JSON to create an RDD.


# Import JSON
import json

# Create RDD from JSON
json_data = '{"name": "Kumar", "age": 39, "city": "New York"}' 
rdd_json = sc.parallelize([json.loads(json_data)])

The input to parallelize() is a Python dictionary obtained by loading JSON data (json_data) using the json.loads() method.

Conclusion

These examples cover various ways to create RDDs in PySpark, including from lists, tuples, external data, existing RDDs, JSON data, key-value pairs, and more. Choose the method that suits your data and processing requirements.

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium