pyspark tutorial

In this PySpark tutorial, you’ll learn the fundamentals of Spark, how to create distributed data processing pipelines, and leverage its versatile libraries to transform and analyze large datasets efficiently with examples. I will also explain what is PySpark. its features, advantages, modules, packages, and how to use RDD & DataFrame with simple and easy examples from my working experience in Python.

Advertisements

All examples explained in this PySpark (Spark with Python) tutorial are basic, simple, and easy to practice for beginners who are enthusiastic to learn PySpark and advance their careers in Big Data, Machine Learning, Data Science, and Artificial intelligence.

Note: If you can’t locate the PySpark examples you need on this beginner’s tutorial page, I suggest utilizing the Search option in the menu bar. This website offers numerous articles in Spark, Scala, PySpark, and Python for learning purposes.

If you are working with a smaller Dataset and don’t have a Spark cluster, but still want to get benefits similar to Spark DataFrame, you can use Python Pandas DataFrames. The main difference is Pandas DataFrame is not distributed and runs on a single node.

Table of Contents –

PySpark Introduction

PySpark Tutorial – PySpark is an Apache Spark library written in Python to run Python applications using Apache Spark capabilities. Using PySpark we can run applications parallelly on the distributed cluster (multiple nodes).

In other words, PySpark is a Python API, functioning as an analytical processing engine suitable for large-scale distributed data processing and machine learning tasks.

What is Apache Spark?

Apache Spark is an open-source unified analytics engine used for large-scale data processing, hereafter referred it as Spark. Spark is designed to be fast, flexible, and easy to use, making it a popular choice for processing large-scale data sets. Spark runs operations on billions and trillions of data on distributed clusters 100 times faster than traditional applications. 

Spark can run on single-node machines or multi-node machines(Cluster). It was created to address the limitations of MapReduce, by doing in-memory processing. Spark reuses data by using an in-memory cache to speed up machine learning algorithms that repeatedly call a function on the same dataset.  This lowers the latency making Spark multiple times faster than MapReduce, especially when doing machine learning, and interactive analytics.  Apache Spark can also process real-time streaming. 

It is also a multi-language engine, that provides APIs (Application Programming Interfaces) and libraries for several programming languages like Java, Scala, Python, and R, allowing developers to work with Spark using the language they are most comfortable with.

  1. Scala: Spark’s primary and native language is Scala. Many of Spark’s core components are written in Scala, and it provides the most extensive API for Spark.
  2. Java: Spark provides a Java API that allows developers to use Spark within Java applications. Java developers can access most of Spark’s functionality through this API.
  3. Python: Spark offers a Python API, called PySpark, which is popular among data scientists and developers who prefer Python for data analysis and machine learning tasks. PySpark provides a Pythonic way to interact with Spark.
  4. R: Spark also offers an R API, enabling R users to work with Spark data and perform distributed data analysis using their familiar R language.

Who uses PySpark?

PySpark is very well used in the Data Science and Machine Learning community as there are many widely used data science libraries written in Python including NumPy, and TensorFlow. Also used due to its efficient processing of large datasets. PySpark has been used by many organizations like Walmart, Trivago, Sanofi, Runtastic, and many more.

Additionally, for development, you can use the Anaconda distribution (widely used in the Machine Learning community). The Anaconda distribution is a comprehensive platform for data science and machine learning tasks. It includes a collection of popular open-source software packages and libraries, such as Python, Jupyter Notebooks, NumPy, pandas, scikit-learn, and many others.

In real-time, PySpark has been used a lot in the machine learning and data scientists community; thanks to vast Python machine learning libraries. In this PySpark tutorial for beginners, I have explained several topics that cover vast concepts of this framework.

Related: How to run Pandas DataFrame on Apache Spark (PySpark)?

What Version of Python PySpark Supports

PySpark 3.5 supports Python versions 3.8 and newer, along with R version 3.5, Java versions 8, 11, and 17, as well as Scala versions 2.12 and 2.13, and later. However, it’s worth mentioning that starting from Spark 3.5.0, support for Java 8 versions before 8u371 has been deprecated.

LanguageSupported Version
Python3.8
JavaJava 8, 11, 13, 17, and the latest versions
Java 8 versions prior to 8u371 have been deprecated
Scala2.12 and 2.13
R3.5
PySpark Supported Version

PySpark Features & Advantages

The following are the main features of PySpark.

PySpark Tutorial Features
PySpark Features
  • Python API: PySpark provides a Python API for interacting with Spark, enabling Python developers to leverage Spark’s distributed computing capabilities.
  • Distributed Computing: PySpark utilizes Spark’s distributed computing framework to process large-scale data across a cluster of machines, enabling parallel execution of tasks.
  • Fault Tolerance: PySpark automatically handles fault tolerance by maintaining resilient distributed datasets (RDDs), which allows it to recover from failures gracefully.
  • Lazy Evaluation: PySpark employs lazy evaluation, meaning transformations on data are not executed immediately but rather stored as a directed acyclic graph (DAG) of computations until an action is triggered.
  • Integration with Python Ecosystem: PySpark seamlessly integrates with the Python ecosystem, allowing users to leverage popular Python libraries such as pandas, NumPy, and scikit-learn for data manipulation and machine learning tasks.
  • Interactive Data Analysis: PySpark is well-suited for interactive data analysis and exploration, thanks to its integration with Jupyter Notebooks and interactive Python shells.
  • Machine Learning: PySpark includes MLlib, Spark’s scalable machine learning library, which provides a wide range of machine learning algorithms for classification, regression, clustering, and more.
  • Streaming Processing: PySpark supports streaming processing through Spark Streaming, enabling real-time data processing and analysis on continuous data streams.
  • SQL Support: PySpark allows users to perform SQL queries on distributed datasets using Spark SQL, providing a familiar interface for working with structured data.

PySpark Advantages

The most important advantages of using PySpark include:

  • Scalability: PySpark harnesses the power of distributed computing, enabling processing of large-scale datasets across clusters of machines, thus accommodating growing data needs.
  • Performance: By leveraging in-memory computing and parallel processing, PySpark achieves high performance, enabling faster data processing and analysis compared to traditional single-node processing.
  • Ease of Use: PySpark provides a user-friendly Python API, making it accessible to Python developers familiar with the language syntax and ecosystem. It also integrates well with popular Python libraries for data analysis and machine learning.
  • Fault Tolerance: PySpark automatically handles fault tolerance through resilient distributed datasets (RDDs), ensuring data reliability and recovery from failures without manual intervention.
  • Unified Platform: PySpark offers a unified platform for various data processing tasks, including batch processing, interactive data analysis, streaming processing, and machine learning, simplifying development and deployment workflows.
  • Real-time Processing: With Spark Streaming and Structured Streaming, PySpark enables real-time processing of data streams, facilitating timely insights and responses to changing data.
  • Machine Learning Capabilities: PySpark includes MLlib, Spark’s machine learning library, providing scalable implementations of popular machine learning algorithms, allowing for large-scale model training and deployment.
  • Community and Ecosystem: PySpark benefits from a vibrant community and ecosystem, offering extensive documentation, tutorials, and third-party packages, as well as continuous development and support from the community.

PySpark Architecture

PySpark architecture consists of a driver program that coordinates tasks and interacts with a cluster manager to allocate resources. The driver communicates with worker nodes, where tasks are executed within an executor’s JVM. SparkContext manages the execution environment, while the DataFrame API enables high-level abstraction for data manipulation. SparkSession provides a unified entry point for Spark functionality. Underneath, the cluster manager oversees resource allocation and task scheduling across nodes, facilitating parallel computation for processing large-scale data efficiently.

To get in-depth knowledge of PySpark Architecture, I have curated separate articles on each topic. I would highly recommend reading the following.

Cluster Manager Types

Cluster managers in PySpark are responsible for resource allocation and task scheduling across nodes in a distributed computing environment. Here’s a brief overview of the different types:

  • Standalone: The standalone cluster manager is a simple, standalone solution bundled with Spark that manages resources for Spark applications. It’s suitable for small to medium-sized clusters and doesn’t require additional software installation.
  • Mesos: Mesos is a distributed systems kernel that abstracts CPU, memory, storage, and other compute resources across a cluster. PySpark can leverage Mesos as a cluster manager, allowing efficient resource sharing among multiple frameworks like Spark, Hadoop, and others.
  • Hadoop YARN (Yet Another Resource Negotiator): YARN is Hadoop’s resource management layer, responsible for managing and scheduling resources across a Hadoop cluster. PySpark can run on YARN, enabling seamless integration with existing Hadoop ecosystems and leveraging YARN’s resource management capabilities.
  • Kubernetes: Kubernetes is a container orchestration platform that automates deployment, scaling, and management of containerized applications. PySpark can run on Kubernetes, enabling dynamic resource allocation and efficient utilization of resources in containerized environments.

local – “local” is a special value used for the master parameter when initializing a SparkContext or SparkSession. When you specify local as the master, it means that Spark will run in local mode, utilizing only a single JVM (Java Virtual Machine) on the local machine where your Python script is executed. This mode is primarily used for development, testing, and debugging purposes

Each cluster manager type offers unique features and benefits, catering to different deployment scenarios and infrastructure requirements. The choice of cluster manager depends on factors such as scalability, resource isolation, integration with existing infrastructure, and ease of management.

PySpark Modules & Packages

In Apache Spark, the PySpark module enables Python developers to interact with Spark, leveraging its powerful distributed computing capabilities. It provides a Python API that exposes Spark’s functionality, allowing users to write Spark applications using Python programming language.

Pyspark modules and packages
Modules & packages
  • PySpark RDD (pyspark.RDD)
  • PySpark DataFrame and SQL (pyspark.sql)
  • PySpark Streaming (pyspark.streaming)
  • PySpark MLib (pyspark.ml, pyspark.mllib)
  • PySpark GraphFrames (GraphFrames)
  • PySpark Resource (pyspark.resource) It’s new in PySpark 3.0
pySpark eco system

Besides these, if you want to use third-party libraries, you can find them at https://spark-packages.org/ . This page is kind of a repository of all Spark third-party libraries.

Download & Install PySpark

Follow the below steps to install PySpark on the Anaconda distribution on Windows.

Related: PySpark Install on Mac

Install Python or Anaconda distribution

I would recommend using Anaconda to Install Python as it is popular and used by the Machine Learning and Data Science community. Follow the instructions to Install Anaconda Distribution and Jupyter Notebook. Alternatively, you can also Python from Python.org.

Install Java

PySpark is built on top of the Spark framework, which itself is implemented in Scala and runs on the Java Virtual Machine (JVM). Hence, you need to install a compatible Java version for your Spark version. You can download and Install Java from Oracle or openJDK from openlogic.com.

Post installation, set JAVA_HOME and PATH variable. I have installed Java at location c:\apps\Java\openJDK17; hence, you see I am setting the path below.


# Download JDK 17
JAVA_HOME = c:\apps\Java\openjdk17
PATH = %PATH%;c:\apps\Java\openjdk17\bin

Install Apache Spark

Go to the Spark Download page, choose the Spark version you want to use, and then choose the package type. The URL on point 3 changes to the selected Spark version. Click on the link from point 3 to download Spark.

PySpark Tutorial Beginners

Once the binary package download, untar the binary using 7zip or any extract library and copy the underlying folder spark-3.5.0-bin-hadoop3 to c:\apps

Now, set the following environment variables.


# Set windows environment variables
SPARK_HOME  = C:\apps\spark-3.5.0-bin-hadoop3
HADOOP_HOME = C:\apps\spark-3.5.0-bin-hadoop3
PATH = %PATH%;C:\apps\spark-3.0.5-bin-hadoop3\bin

Setup winutils.exe

Please download the appropriate version of winutils.exe from the Winutils GitHub repository and place it into the %SPARK_HOME%\bin directory. Winutils are different for each Hadoop version hence download the right version from https://github.com/steveloughran/winutils

Hadoop is not natively supported on Windows, but Spark expects certain Hadoop components, including the Hadoop Distributed File System (HDFS), to be present to run. Winutils provides a set of native libraries and executables that emulate Hadoop’s file system API on Windows, allowing Spark to interact with the file system correctly. Without winutils, Spark may encounter errors or fail to perform file system operations on Windows platforms.

Validate PySpark Install

To validate if the PySpark has been installed correctly. Open the command prompt and type pyspark command to run the PySpark shell.

The PySpark shell is an interactive environment for running PySpark code. It is a CLI tool that provides a Python interpreter with access to Spark functionalities, enabling users to execute Spark commands, perform data manipulations, and analyze results interactively.


# Run pyspark shell
$SPARK_HOME/sbin/pyspark

You should see something like this below.

PySpark shell

PySpark shell also creates a Spark context web UI, which can be accessed by default from http://localhost:4040.

Note: Spark Web UI typically uses port 4040 by default. However, if port 4040 is already in use, Spark will automatically attempt the next available port (4041, 4042, and so on). You can access the Spark Web UI in a web browser by navigating to http://localhost:4040 (or the respective port number if it’s different).

Spark Web UI

The Spark Web UI is a graphical user interface provided by Apache Spark for monitoring and debugging Spark applications. It displays information about job progress, task execution, resource utilization, and system metrics, allowing users to optimize performance, diagnose issues, and gain insights into their Spark jobs.

PySpark Web UI

Access Spark History Server

The Spark History Server stores information about completed Spark applications (spark-submit, spark-shell), including logs, metrics, and event timelines. It allows users to view detailed information about past job executions, such as tasks, stages, and configurations, through a web-based user interface.

It serves as a centralized repository for historical job data, promoting transparency, reproducibility, and continuous improvement in Spark application development and operations. Before you start the history server, make sure you set the below config on spark-defaults.conf


spark.eventLog.enabled true
spark.history.fs.logDirectory file:///c:/logs/path

Now, start the Spark history server on Linux or Mac by running it.


# Start history server on linux/mac
$SPARK_HOME/sbin/start-history-server.sh

This command is not available on Windows. However, you can start the history server by starting the below command.


# Start history server on windows
$SPARK_HOME/bin/spark-class.cmd org.apache.spark.deploy.history.HistoryServer

You can access the Spark History server by accessing http://localhost:18080/

Spark History Server

It will list all the application IDs you ran in the past. By clicking on each ID, you will get its details.

Spyder IDE & Jupyter Notebook

To write PySpark applications, you would need an IDE. There are dozens of IDEs to work with, but I chose to use Spyder IDE and Jupyter Notebook. If you have not installed Spyder IDE and Jupyter Notebook along with the Anaconda distribution, do so before you proceed.

Now, set the following environment variable.


# Set environment variable
PYTHONPATH => %SPARK_HOME%/python;$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;%PYTHONPATH%

Now open Spyder IDE, create a new file with the below simple PySpark program, and run it. You should see 5 in output.

spyder ide
PySpark application running on Spyder IDE

Now let’s start the Jupyter Notebook

jupyter example
PySpark statements running on Jupyter Interface

PySpark RDD – Resilient Distributed Dataset

PySpark RDD (Resilient Distributed Dataset) is a fundamental data structure of PySpark that is fault-tolerant, immutable, and distributed collections of objects. RDDs are immutable, meaning they cannot be changed once created. Any transformation on an RDD results in a new RDD. Each dataset in RDD is divided into logical partitions, which can be computed on different nodes of the cluster.

RDD Creation 

In order to create an RDD, first, you need to create a SparkSession which is an entry point to the PySpark application. SparkSession can be created using a builder() or newSession() methods of the SparkSession.

Spark session internally creates a sparkContext variable of SparkContext. You can create multiple SparkSession objects but only one SparkContext per JVM. In case you want to create another new SparkContext, you should stop the existing Sparkcontext (using stop()) before creating a new one.

Let’s create an RDD from a text file using textFile() function of the SparkContext.


# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate() 

# Create RDD from external Data source
rdd2 = spark.sparkContext.textFile("/path/test.txt")

Once you have an RDD, you can perform transformation and action operations. Any operation you perform on RDD runs in parallel.

RDD Operations

You can perform two types of operations on RDD; Transformations and Actions.

RDD transformations in PySpark are lazy operations and they execute only when an action is called on RDD.

Transformation operations are map, filter, flatMap, groupByKey, reduceByKey, join, union, sortByKey, distinct, sample, mapPartitions, and aggregateByKey. These functions transform RDDs by applying computations in a distributed manner across a cluster of machines and return a new RDD

RDD actions in PySpark trigger computations and return results to the Spark driver. Key actions include collect, count, take, reduce, foreach, first, takeOrdered, takeSample, countByKey, saveAsTextFile, saveAsSequenceFile, saveAsObjectFile, foreachPartition, collectAsMap, aggregate, and fold. These actions initiate execution and materialize RDD data. Remember any RDD operation that returns non RDD is considered as an action. 

PySpark DataFrame

A DataFrame is a distributed dataset comprising data arranged in rows and columns with named attributes. It shares similarities with relational database tables or R/Python data frames but incorporates sophisticated optimizations.

If you come from a Python background, I would assume you already know what Pandas DataFrame is. PySpark DataFrame is mostly similar to Pandas DataFrame, with the exception that PySpark DataFrames are distributed in the cluster (meaning the data in data frames are stored in different machines in a cluster), and any operations in PySpark execute in parallel on all machines, whereas Panda Dataframe stores and operates on a single machine.

If you have no Python background, don’t worry and you can learn about it from Python Pandas Tutorial.

Is PySpark faster than pandas?

PySpark is a distributed computing framework well-suited for processing large-scale datasets that exceed the memory capacity of a single machine. It can leverage parallel processing across a cluster of machines, enabling faster computations on massive datasets.

On the other hand, pandas, being a single-machine library, is optimized for smaller to medium-sized datasets that can fit into memory. It typically performs well for data manipulation and analysis tasks on small to medium datasets.

To know more read at Pandas DataFrame vs PySpark Differences with Examples.

Creating PySpark DataFrame

Using a list is one of the simplest ways to create a PySpark DataFrame. If you already have an RDD, you can easily convert it to DataFrame. Use createDataFrame() from the SparkSession to create a DataFrame.


# Create DataFrame
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

Since DataFrame is a structured format that contains names and data types in columns, we can get the schema of the DataFrame using df.printSchema()

To display the DataFrame use df.show() which shows the 20 rows by default.


# Output:
+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|dob       |gender|salary|
+---------+----------+--------+----------+------+------+
|James    |          |Smith   |1991-04-01|M     |3000  |
|Michael  |Rose      |        |2000-05-19|M     |4000  |
|Robert   |          |Williams|1978-09-05|M     |4000  |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |
+---------+----------+--------+----------+------+------+

DataFrame Operations

Similar to RDD, Transformations and Actions operations are also available in PySpark DataFrame. Below are some examples to learn more about them.

Using External Data Sources

In real-time applications, Data Frames are created from external sources, such as files from the local system, HDFS, S3 Azure, HBase, MySQL table, etc. For example, to read a CSV file, use the following.


# Create DataFrame from CSV file
df = spark.read.csv("/tmp/resources/zipcodes.csv")
df.printSchema()

Following are some resources to learn how to read and write to external data sources

Supported file formats

PySpark, by default, supports a rich set of APIs to read and write several file formats

  • Text Files (.txt)
  • CSV Files (.csv)
  • TSV Files (.tsv)
  • Avro Files (.avro)
  • JSON Files (.json)
  • Parquet (.parquet)
  • ORC Files (.orc)
  • XML Files and many other formats

DataFrame Examples

Following are several PySpark examples that can help you understand more about DataFrame.

PySpark SQL

PySpark SQL is a module in Spark that provides a higher-level abstraction for working with structured data and can be used SQL queries. Once you have a DataFrame created, you can interact with the data using ANSI SQL syntax.

PySpark SQL enables you to write SQL queries against structured data, leveraging standard SQL syntax and semantics. This familiarity with SQL allows users with SQL proficiency to transition to Spark for data processing tasks easily.

First, you should create a temporary table or view on DataFrame to use SQL queries. Once created, this table can be accessed throughout the SparkSession using sql().

These tables and views are scoped to the SparkSession that created them. Once the SparkSession is terminated, either by closing the Spark application or ending the Spark session explicitly, the temporary views are removed from memory.

To run SQL queries, use sql() method of the SparkSession object. Note that this function returns a DataFrame.


# Create temporary table
df.createOrReplaceTempView("PERSON_DATA")

# Run SQL query
df2 = spark.sql("SELECT * from PERSON_DATA")
df2.printSchema()
df2.show()

Use group by clause to run aggregate queries.


# Using groupby
groupDF = spark.sql("SELECT gender, count(*) from PERSON_DATA group by gender")
groupDF.show()

This yields the below output


# Output:
+------+--------+
|gender|count(1)|
+------+--------+
|     F|       2|
|     M|       3|
+------+--------+

PySpark Streaming Tutorial

PySpark Streaming Tutorial for Beginners – Spark streaming is used to process real-time data from sources like file system folders, TCP sockets, S3, Kafka, Flume, Twitter, and Amazon Kinesis. The processed data can be pushed to databases, Kafka, live dashboards e.t.c

Streaming from TCP Socket

To create a Spark Streaming application that reads data from a TCP socket, you can use the readStream.format("socket") from Spark session object. Use the option() to specify the TCP host and port number to read.


# Spark streaming from socket
df = spark.readStream
      .format("socket")
      .option("host","localhost")
      .option("port","9090")
      .load()

Spark reads the data from the socket and represents it in a “value” column of DataFrame. df.printSchema() outputs


# Output:
root
 |-- value: string (nullable = true)

After processing, you can stream the dataframe to the console. In real-time, we ideally stream it to either Kafka, database e.t.c


# Write to console
query = count.writeStream
      .format("console")
      .outputMode("complete")
      .start()
      .awaitTermination()

Streaming from Kafka

Using Spark Streaming we can read from Kafka topic and write to Kafka topic in TEXT, CSV, AVRO and JSON formats

pySpark tutorial for beginners

# Stream from Kafka
df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "192.168.1.100:9092")
        .option("subscribe", "json_topic")
        .option("startingOffsets", "earliest") // From starting
        .load()

Write a message to another topic in Kafka using writeStream()


# Stream write to kafka
df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
   .writeStream
   .format("kafka")
   .outputMode("append")
   .option("kafka.bootstrap.servers", "192.168.1.100:9092")
   .option("topic", "josn_data_topic")
   .start()
   .awaitTermination()

PySpark MLlib

PySpark MLlib is Apache Spark’s scalable machine learning library, offering a suite of algorithms and tools for building, training, and deploying machine learning models. It provides implementations of popular algorithms for classification, regression, clustering, collaborative filtering, and more.

MLlib is designed for distributed computing, allowing it to handle large-scale datasets across clusters efficiently. It offers both high-level APIs for ease of use and low-level APIs for fine-grained control over model training and evaluation.

MLlib seamlessly integrates with Spark’s ecosystem, enabling end-to-end machine learning workflows, including data preprocessing, feature engineering, model training, and deployment in production environments.

Here’s a simple example using Spark MLlib in Python to train a linear regression model:


# Import
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Sample training data
data = [(1.0, 2.0), (2.0, 3.0), (3.0, 4.0), (4.0, 5.0), (5.0, 6.0)]
df = spark.createDataFrame(data, ["features", "label"])

# Define a feature vector assembler
assembler = VectorAssembler(inputCols=["features"], outputCol="features_vec")

# Transform the DataFrame with the feature vector assembler
df = assembler.transform(df)

# Create a LinearRegression model
lr = LinearRegression(featuresCol="features_vec", labelCol="label")

# Fit the model to the training data
model = lr.fit(df)

# Print the coefficients and intercept of the model
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

# Stop the SparkSession
spark.stop()

This is a simple example to demonstrate the usage of Spark MLlib for linear regression. In practice, you would typically use larger datasets and more complex models for real-world machine learning tasks.

PySpark GraphFrames

PySpark GraphFrames were introduced in Spark 3.0 to support Graphs on Data Frames. Prior to 3.0, Spark had a GraphX library that ideally runs on RDD.

Spark GraphFrames is a graph processing library built on top of Apache Spark, designed for processing large-scale graph data. It provides high-level APIs for constructing, querying, and analyzing graphs in a distributed manner.

To make GraphFrames work, you need to install graphframes library. And, download the graphframes.jar from the Maven repository and upload it to Python.


# Install graphframes
pip install graphframes

Example


# Import
from graphframes import GraphFrame

# Define vertices DataFrame
vertices = spark.createDataFrame([
    ("a", "Raman", 34),
    ("b", "Bob", 36),
    ("c", "Naveen", 30),
    ("d", "Kumar", 29)
], ["id", "name", "age"])

# Define edges DataFrame
edges = spark.createDataFrame([
    ("a", "b", "friend"),
    ("b", "c", "follow"),
    ("c", "d", "friend"),
    ("d", "a", "follow")
], ["src", "dst", "relationship"])

# Create a GraphFrame
g = GraphFrame(vertices, edges)

# Query the graph
g.vertices.show()
g.edges.show()

# Find the shortest path between two vertices
results = g.shortestPaths(landmarks=["a", "d"])
results.select("id", "distances").show()

# Stop the SparkSession
spark.stop()

Conclusion

PySpark is a powerful open-source data processing framework that provides high-speed, distributed computing for big data analytics. In this tutorial, I hope you have learned the fundamentals of PySpark, how to create distributed data processing pipelines, and how to leverage its versatile libraries to transform and analyze large datasets efficiently. Every sample example explained in this site is tested in our development environment and is available at the PySpark Examples Github project for reference.

References

Below are some of the articles/tutorials I’ve referred to complete this Tutorial.