PySpark Tutorial For Beginners (Spark 3.5 with Python)
In this PySpark tutorial, you’ll learn the fundamentals of Spark, how to create distributed data processing pipelines, and leverage its versatile libraries to transform and analyze large datasets efficiently with examples. I will also explain what is PySpark. its features, advantages, modules, packages, and how to use RDD & DataFrame with simple and easy examples from my working experience in Python code.
All examples provided in this PySpark (Spark with Python) tutorial are basic, simple, and easy to practice for beginners who are enthusiastic to learn PySpark and advance their careers in Big Data, Machine Learning, Data Science, and Artificial intelligence.
Note: In case you can’t find the PySpark examples you are looking for on this beginner’s tutorial page, I would recommend using the Search option from the menu bar to find your tutorial and sample example code. There are hundreds of tutorials in Spark, Scala, PySpark, and Python on this website you can learn from.
If you are working with a smaller Dataset and don’t have a Spark cluster, but still want to get benefits similar to Spark DataFrame, you can use Python Pandas DataFrames. The main difference is Pandas DataFrame is not distributed and runs on a single node.
PySpark Tutorial Table of Contents –
- Cluster Manager Types
- Modules and Packages
- Installation on Windows
- Spyder IDE & Jupyter Notebook
PySpark Tutorial Introduction
PySpark Tutorial – PySpark is an Apache Spark library written in Python to run Python applications using Apache Spark capabilities. Using PySpark we can run applications parallelly on the distributed cluster (multiple nodes).
In other words, PySpark is a Python API which is an analytical processing engine for large-scale powerful distributed data processing and machine learning applications.
What is Apache Spark?
Apache Spark is an open-source unified analytics engine used for large-scale data processing, hereafter referred it as Spark. Spark is designed to be fast, flexible, and easy to use, making it a popular choice for processing large-scale data sets. Spark runs operations on billions and trillions of data on distributed clusters 100 times faster than traditional applications.
Spark can run on single-node machines or multi-node machines(Cluster). It was created to address the limitations of MapReduce, by doing in-memory processing. Spark reuses data by using an in-memory cache to speed up machine learning algorithms that repeatedly call a function on the same dataset. This lowers the latency making Spark multiple times faster than MapReduce, especially when doing machine learning, and interactive analytics. Apache Spark can also process real-time streaming.
It is also a multi-language engine, that provides APIs (Application Programming Interfaces) and libraries for several programming languages like Java, Scala, Python, and R, allowing developers to work with Spark using the language they are most comfortable with.
- Scala: Spark’s primary and native language is Scala. Many of Spark’s core components are written in Scala, and it provides the most extensive API for Spark.
- Java: Spark provides a Java API that allows developers to use Spark within Java applications. Java developers can access most of Spark’s functionality through this API.
- Python: Spark offers a Python API, called PySpark, which is popular among data scientists and developers who prefer Python for data analysis and machine learning tasks. PySpark provides a Pythonic way to interact with Spark.
- R: Spark also offers an R API, enabling R users to work with Spark data and perform distributed data analysis using their familiar R language.
Who uses PySpark?
PySpark is very well used in the Data Science and Machine Learning community as there are many widely used data science libraries written in Python including NumPy, and TensorFlow. Also used due to its efficient processing of large datasets. PySpark has been used by many organizations like Walmart, Trivago, Sanofi, Runtastic, and many more.
Additionally, For the development, you can use Anaconda distribution (widely used in the Machine Learning community) which comes with a lot of useful tools like Spyder IDE, and Jupyter Notebook to run PySpark applications.
In real-time, PySpark has been used a lot in the machine learning and data scientists community; thanks to vast Python machine learning libraries. In this PySpark tutorial for beginners, I have explained several topics that cover vast concepts of this framework.
What are the Features of PySpark?
The following are the main features of PySpark.
- In-memory computation
- Distributed processing using parallelize
- Can be used with many cluster managers (Spark, Yarn, Mesos e.t.c)
- Lazy evaluation
- Cache & persistence
- Inbuild-optimization when using DataFrames
- Supports ANSI SQL
Advantages of PySpark
- PySpark is a general-purpose, in-memory, distributed processing engine that allows you to process data efficiently in a distributed fashion.
- Applications running on PySpark are 100x faster than traditional systems.
- You will get great benefits from using PySpark for data ingestion pipelines.
- Using PySpark we can process data from Hadoop HDFS, AWS S3, and many file systems.
- PySpark also is used to process real-time data using Streaming and Kafka.
- Using PySpark streaming you can also stream files from the file system and also stream from the socket.
- PySpark natively has machine learning and graph libraries.
What Version of Python PySpark Supports
PySpark 3.5 is compatible with Python 3.8 and newer, as well as R 3.5, Java versions 8, 11, and 17, and Scala versions 2.12 and 2.13, beyond. However, it’s important to note that support for Java 8 versions prior to 8u371 has been deprecated starting from Spark 3.5.0.
|Java||Java 8, 11, 13, 17, and the latest versions|
Java 8 versions prior to 8u371 have been deprecated
|Scala||2.12 and 2.13|
Apache Spark works in a master-slave architecture where the master is called the “Driver” and slaves are called “Workers”. When you run a Spark application, Spark Driver creates a context that is an entry point to your application, and all operations (transformations and actions) are executed on worker nodes, and the resources are managed by Cluster Manager.
For additional learning on this topic, I would recommend reading the following.
- What is Spark Job
- What is the Spark Stage? Explained
- What is Spark Executor
- What is Apache Spark Driver?
- What is DAG in Spark or PySpark
- What is a Lineage Graph in Spark?
- How to Submit a Spark Job via Rest API?
Cluster Manager Types
As of writing this Spark with Python (PySpark) tutorial for beginners, Spark supports below cluster managers:
- Standalone – a simple cluster manager included with Spark that makes it easy to set up a cluster.
- Apache Mesos – Mesons is a Cluster manager that can also run Hadoop MapReduce and PySpark applications.
- Hadoop YARN – the resource manager in Hadoop 2. This is mostly used as a cluster manager.
- Kubernetes – an open-source system for automating deployment, scaling, and management of containerized applications.
local – which is not really a cluster manager but still I wanted to mention that we use “local” for
master() in order to run Spark on your laptop/computer.
PySpark Modules & Packages
- PySpark RDD (pyspark.RDD)
- PySpark DataFrame and SQL (pyspark.sql)
- PySpark Streaming (pyspark.streaming)
- PySpark MLib (pyspark.ml, pyspark.mllib)
- PySpark GraphFrames (GraphFrames)
- PySpark Resource (pyspark.resource) It’s new in PySpark 3.0
Besides these, if you want to use third-party libraries, you can find them at https://spark-packages.org/ . This page is kind of a repository of all Spark third-party libraries.
How to do PySpark Installation
In order to run PySpark examples mentioned in this beginner tutorial, you need to have Python, Spark and its needed tools to be installed on your computer. Since most developers use Windows for development, I will explain how to install PySpark on Windows.
Install Python or Anaconda distribution
Download and install either Python from Python.org or Anaconda distribution which includes Python, Spyder IDE, and Jupyter Notebook. I would recommend using Anaconda as it’s popular and used by the Machine Learning and data science community. Follow the instructions to Install Anaconda Distribution and Jupyter Notebook.
Related: PySpark Install on Mac
Install Java 8
To run the PySpark application, you would need Java 8 or later version hence download the Java version from Oracle and install it on your system.
Post installation, set
JAVA_HOME = C:\Program Files\Java\jdk1.8.0_201 PATH = %PATH%;C:\Program Files\Java\jdk1.8.0_201\bin
Install Apache Spark
Download Apache Spark by accessing the Spark Download page and selecting the link from “Download Spark (point 3)”. If you want to use a different version of Spark & Hadoop, select the one you wanted from dropdowns, and the link on point 3 changes to the selected version and provides you with an updated link to download.
After download, untar the binary using 7zip or any extract library and copy the underlying folder
Now set the following environment variables.
SPARK_HOME = C:\apps\spark-3.5.0-bin-hadoop3 HADOOP_HOME = C:\apps\spark-3.5.0-bin-hadoop3 PATH=%PATH%;C:\apps\spark-3.0.5-bin-hadoop3\bin
Download winutils.exe file from winutils, and copy it to
%SPARK_HOME%\bin folder. Winutils are different for each Hadoop version hence download the right version from https://github.com/steveloughran/winutils
Now open the command prompt and type
pyspark command to run the PySpark shell.
You should see something like this below.
PySpark shell also creates a Spark context web UI and by default, it can access from http://localhost:4041.
Spark Web UI
Apache Spark provides a suite of Web UIs (Jobs, Stages, Tasks, Storage, Environment, Executors, and SQL) to monitor the status of your Spark application, resource consumption of the Spark cluster, and Spark configurations. On Spark Web UI, you can see how the operations are executed.
Spark History Server
Spark History servers, keep a log of all Spark applications you submit by spark-submit, spark-shell. before you start, first you need to set the below config on
spark.eventLog.enabled true spark.history.fs.logDirectory file:///c:/logs/path
Now, start the spark history server on Linux or Mac by running it.
If you are running Spark on Windows, you can start the history server by starting the below command.
By clicking on each App ID, you will get the details of the application in PySpark web UI.
Spyder IDE & Jupyter Notebook
To write PySpark applications, you would need an IDE, there are 10’s of IDEs to work with and I choose to use Spyder IDE and Jupyter Notebook. If you have not installed Spyder IDE and Jupyter Notebook along with Anaconda distribution, install these before you proceed.
Now, set the following environment variable.
PYTHONPATH => %SPARK_HOME%/python;$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;%PYTHONPATH%
Now open Spyder IDE, create a new file with the below simple PySpark program, and run it. You should see 5 in output.
Now let’s start the Jupyter Notebook
PySpark RDD – Resilient Distributed Dataset
In this section of the PySpark tutorial, I will introduce the RDD and explain how to create them and use its transformation and action operations with examples. Here is the full article on PySpark RDD in case you want to learn more about it and get your fundamentals strong.
PySpark RDD (Resilient Distributed Dataset) is a fundamental data structure of PySpark that is fault-tolerant, immutable distributed collections of objects, which means once you create an RDD you cannot change it. Each dataset in RDD is divided into logical partitions, which can be computed on different nodes of the cluster.
In order to create an RDD, first, you need to create a SparkSession which is an entry point to the PySpark application. SparkSession can be created using a
newSession() methods of the SparkSession.
Spark session internally creates a
sparkContext variable of
SparkContext. You can create multiple SparkSession objects but only one SparkContext per JVM. In case you want to create another new SparkContext you should stop the existing Sparkcontext (using
stop()) before creating a new one.
# Import SparkSession from pyspark.sql import SparkSession # Create SparkSession spark = SparkSession.builder \ .master("local") \ .appName("SparkByExamples.com") \ .getOrCreate()
SparkContext has several functions to use with RDDs. For example, it’s
parallelize() method is used to create an RDD from a list.
# Create RDD from parallelize dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)] rdd=spark.sparkContext.parallelize(dataList)
RDD can also be created from a text file using
textFile() function of the SparkContext.
# Create RDD from external Data source rdd2 = spark.sparkContext.textFile("/path/test.txt")
Once you have an RDD, you can perform transformation and action operations. Any operation you perform on RDD runs in parallel.
On PySpark RDD, you can perform two kinds of operations.
RDD transformations – Transformations are lazy operations. When you run a transformation(for example update), instead of updating a current RDD, these operations return another RDD.
RDD actions – operations that trigger computation and return RDD values to the driver.
Transformations on Spark RDD return another RDD and transformations are lazy meaning they don’t execute until you call an action on RDD. Some transformations on RDDs are
sortByKey() and return a new RDD instead of updating the current.
RDD Action operation returns the values from an RDD to a driver node. In other words, any RDD function that returns non RDD[T] is considered as an action.
Some actions on RDDs are
reduce() and more.
PySpark DataFrame Tutorial for Beginners
DataFrame definition is very well explained by Databricks hence I do not want to define it again and confuse you. Below is the definition I took from Databricks.
DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs.– Databricks
If you are coming from a Python background I would assume you already know what Pandas DataFrame is; PySpark DataFrame is mostly similar to Pandas DataFrame with the exception that PySpark DataFrames are distributed in the cluster (meaning the data in data frames are stored in different machines in a cluster) and any operations in PySpark executes in parallel on all machines whereas Panda Dataframe stores and operates on a single machine.
If you have no Python background, I would recommend you learn some basics of Python before you proceed with this Spark tutorial. For now, just know that data in PySpark DataFrames are stored in different machines in a cluster.
Is PySpark faster than pandas?
Due to parallel execution on all cores on multiple machines, PySpark runs operations faster than Pandas. In other words, Pandas DataFrames run operations on a single node whereas PySpark runs on multiple machines. To know more read at Pandas DataFrame vs PySpark Differences with Examples.
The simplest way to create a DataFrame is from a Python list of data. DataFrame can also be created from an RDD and by reading files from several sources.
createDataFrame() function of the SparkSession you can create a DataFrame.
data = [('James','','Smith','1991-04-01','M',3000), ('Michael','Rose','','2000-05-19','M',4000), ('Robert','','Williams','1978-09-05','M',4000), ('Maria','Anne','Jones','1967-12-01','F',4000), ('Jen','Mary','Brown','1980-02-17','F',-1) ] columns = ["firstname","middlename","lastname","dob","gender","salary"] df = spark.createDataFrame(data=data, schema = columns)
Since DataFrame’s are structure format that contains names and columns, we can get the schema of the DataFrame using
df.show() shows the 20 elements from the DataFrame.
+---------+----------+--------+----------+------+------+ |firstname|middlename|lastname|dob |gender|salary| +---------+----------+--------+----------+------+------+ |James | |Smith |1991-04-01|M |3000 | |Michael |Rose | |2000-05-19|M |4000 | |Robert | |Williams|1978-09-05|M |4000 | |Maria |Anne |Jones |1967-12-01|F |4000 | |Jen |Mary |Brown |1980-02-17|F |-1 | +---------+----------+--------+----------+------+------+
Like RDD, DataFrame also has operations like Transformations and Actions.
- PySpark – Ways to Rename column on DataFrame
- PySpark withColumn() usage with Examples
- PySpark – How to Filter data from DataFrame
- PySpark orderBy() and sort() explained
- PySpark explode array and map columns to rows
- PySpark – explode nested array into rows
DataFrame from external data sources
In real-time applications, DataFrames are created from external sources like files from the local system, HDFS, S3 Azure, HBase, MySQL table etc. Below is an example of how to read a CSV file from a local system.
df = spark.read.csv("/tmp/resources/zipcodes.csv") df.printSchema()
Following are some resource to learn how to read and write to external data sources
- PySpark Read CSV file into DataFrame
- PySpark Write DataFrame to CSV file
- PySpark – Read & Write Parquet File
- PySpark – Read & Write JSON file
- PySpark – Read Hive Table
- PySpark – Read JDBC in Parallel
- PySpark – Read and Write SQL Server
- PySpark – Read and Write MySQL
- PySpark – Read JDBC Table
Supported file formats
DataFrame has a rich set of APIs that supports reading and writing several file formats
- xml and many more
In this section of the PySpark Tutorial for Beginners, you will find several Spark examples written in Python that help in your projects.
- Different ways to Create DataFrame in PySpark
- PySpark Groupby Explained with Examples
- PySpark Aggregate Functions with Examples
- PySpark Joins Explained with Examples
PySpark SQL Tutorial
PySpark SQL is one of the most used PySpark modules which is used for processing structured columnar data format. Once you have a DataFrame created, you can interact with the data by using SQL syntax.
In other words, Spark SQL brings native RAW SQL queries on Spark meaning you can run traditional ANSI SQL on Spark Dataframe, in the later section of this PySpark SQL tutorial, you will learn in detail how to use SQL
In order to use SQL, first, create a temporary table on DataFrame using
createOrReplaceTempView() function. Once created, this table can be accessed throughout the SparkSession using
sql() and it will be dropped along with your SparkContext termination.
sql() method of the SparkSession object to run the query and this method returns a new DataFrame.
df.createOrReplaceTempView("PERSON_DATA") df2 = spark.sql("SELECT * from PERSON_DATA") df2.printSchema() df2.show()
Let’s see another PySpark example using
groupDF = spark.sql("SELECT gender, count(*) from PERSON_DATA group by gender") groupDF.show()
This yields the below output
+------+--------+ |gender|count(1)| +------+--------+ | F| 2| | M| 3| +------+--------+
Similarly, you can run any traditional SQL queries on DataFrame using PySpark SQL.
PySpark Streaming Tutorial
PySpark Streaming Tutorial for Beginners – Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads. It is used to process real-time data from sources like file system folders, TCP sockets, S3, Kafka, Flume, Twitter, and Amazon Kinesis to name a few. The processed data can be pushed to databases, Kafka, live dashboards e.t.c
Streaming from TCP Socket
readStream.format("socket") from Spark session object to read data from the socket and provide options host and port where you want to stream data from.
df = spark.readStream .format("socket") .option("host","localhost") .option("port","9090") .load()
Spark reads the data from the socket and represents it in a “value” column of DataFrame.
root |-- value: string (nullable = true)
After processing, you can stream the data frame to the console. In real-time, we ideally stream it to either Kafka, database e.t.c
query = count.writeStream .format("console") .outputMode("complete") .start() .awaitTermination()
Streaming from Kafka
Using Spark Streaming we can read from Kafka topic and write to Kafka topic in TEXT, CSV, AVRO and JSON formats
df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "192.168.1.100:9092") .option("subscribe", "json_topic") .option("startingOffsets", "earliest") // From starting .load()
Below Pyspark example, Write a message to another topic in Kafka using
df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") .writeStream .format("kafka") .outputMode("append") .option("kafka.bootstrap.servers", "192.168.1.100:9092") .option("topic", "josn_data_topic") .start() .awaitTermination()
In this section, I will cover PySpark examples by using MLlib library.
PySpark GraphFrames are introduced in Spark 3.0 version to support Graphs on DataFrames. Prior to 3.0, Spark had a GraphX library which ideally runs on RDD and loses all Data Frame capabilities.
Difference between GraphX and GraphFrame
GraphX works on RDDs whereas GraphFrames works with DataFrames.
What are the key features and improvements released in PySpark 3.5.0
Following are some of the key features and improvements in PySpark 3.5
- Spark Connect: This release extends the general availability of Spark Connect with support for Scala and Go clients, distributed training and inference support, and enhanced compatibility for Structured streaming.
- PySpark and SQL Functionality: New functionality has been introduced in PySpark and SQL, including the SQL IDENTIFIER clause, named argument support for SQL function calls, SQL function support for HyperLogLog approximate aggregations, and Python user-defined table functions.
- Distributed Training with DeepSpeed: The release simplifies distributed training with DeepSpeed, making it more accessible.
- Structured Streaming: It introduces watermark propagation among operators and dropDuplicatesWithinWatermark operations in Structured Streaming, enhancing its capabilities.
- English SDK: Apache Spark for English SDK integrates the extensive expertise of Generative AI in Apache Spark.
PySpark Tutorial Conclusion
PySpark is a powerful open-source data processing framework that provides high-speed, distributed computing for big data analytics. In this tutorial, I hope you have learned the fundamentals of Spark, how to create distributed data processing pipelines, and leverage its versatile libraries to transform and analyze large datasets efficiently. Every sample example explained in this PySpark Tutorial for Beginners is tested in our development environment and is available at PySpark Examples Github project for reference.
- Apache Spark 3.5 Tutorial with Scala examples
- Apache Spark 3.5.0 features
- Spark Streaming with Kafka Example
- PySpark DataFrame Tutorial with Examples
- Spark DataFrame Tutorial with Examples
Below are some of the articles/tutorials I’ve referred to complete the PySpark Tutorial.