• Post author:
  • Post category:PySpark
  • Post last modified:March 27, 2024
  • Reading time:15 mins read
You are currently viewing PySpark SQL with Examples

PySpark SQL is a very important and most used module that is used for structured data processing. It allows developers to seamlessly integrate SQL queries with Spark programs, making it easier to work with structured data using the familiar SQL language. PySpark SQL provides a DataFrame API for manipulating data in a distributed and fault-tolerant manner.

PySpark is an Apache Spark library written in Python to run Python applications using Apache Spark capabilities. Using PySpark we can run applications parallelly on the distributed cluster (multiple nodes).

Related: PySpark SQL Functions

1. PySpark SQL Introduction

PySpark SQL Tutorial – The pyspark.sql is a module in PySpark that is used to perform SQL-like operations on the data stored in memory. You can either leverage using programming API to query the data or use the ANSI SQL queries similar to RDBMS. You can also mix both, for example, use API on the result of an SQL query.

Following are the important classes from the SQL module.

  • pyspark.sql.SparkSession – SparkSession is the main entry point for DataFrame and SQL functionality. It is responsible for coordinating the execution of SQL queries and DataFrame operations. SparkSession can be created using the SparkSession.builder API. It encapsulates the functionality of the older SQLContext and HiveContext.
  • pyspark.sql.DataFrame – DataFrame is a distributed collection of data organized into named columns. DataFrames can be created from various sources like CSV, JSON, Parquet, Hive, etc., and they can be transformed using a rich set of high-level operations.
  • pyspark.sql.Column – A column expression in a DataFrame.It can be used to reference, manipulate, and transform columns.
  • pyspark.sql.Row – A row of data in a DataFrame.Rows are used to store and manipulate data in a distributed and structured way. Each Row object can be thought of as a record or a tuple with named fields, similar to a row in a relational database table.
  • pyspark.sql.GroupedData – An object type returned by DataFrame.groupBy() This class provides methods for calculating summary statistics, aggregating data, and applying various functions to grouped data.
  • pyspark.sql.DataFrameNaFunctions – Methods for handling missing data (null values). This class is specifically designed to handle operations related to missing data and provides functionalities for filling, dropping, and replacing null values in a PySpark DataFrame.
  • pyspark.sql.DataFrameStatFunctions – This class is part of the PySpark SQL module and is designed to facilitate the computation of summary statistics on numerical columns in a DataFrame. It offers methods for calculating various descriptive statistics, correlation, covariance, and more.
  • pyspark.sql.functions – List of standard built-in functions.
  • pyspark.sql.types – Available SQL data types in PySpark.
  • pyspark.sql.Window – Would be used to work with window functions.

Regardless of what approach you use, you have to create a SparkSession which is an entry point to the PySpark application.


# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()

2. PySpark SQL DataFrame API

The PySpark DataFrame definition is very well explained by Databricks hence I do not want to define it again and confuse you. Below is the definition I described in Databricks.

DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs.– Databricks

If you are coming from a Python background I would assume you already know what Pandas DataFrame is; PySpark DataFrame is mostly similar to Pandas DataFrame with the exception PySpark DataFrames are distributed in the cluster (meaning the data in DataFrame are stored in different machines in a cluster) and any operations in PySpark executes in parallel on all machines whereas Panda Dataframe stores and operates on a single machine.

If you have no Python background, For now, just know that data in PySpark DataFrames are stored in different machines in a cluster. When you run on a local laptop, it uses runs on your laptop.

3. Running SQL Queries in PySpark

PySpark SQL is one of the most used PySpark modules which is used for processing structured columnar data format. Once you have a DataFrame created, you can interact with the data by using SQL syntax.

In other words, Spark SQL brings native RAW SQL queries on Spark meaning you can run traditional ANSI SQL on Spark Dataframe, in the SQL tutorial, you will learn in detail using SQL selectwheregroup byjoinunion e.t.c

In order to use SQL, first, create a temporary table on DataFrame using the createOrReplaceTempView() function. Once created, this table can be accessed throughout the SparkSession using sql() and it will be dropped along with your SparkContext termination.

Use sql() method of the SparkSession object to run the query and this method returns a new DataFrame.

4. PySpark SQL Examples

4.1 Create SQL View

Create a DataFrame from a CSV file. You can find this CSV file at Github project.


# Read CSV file into table
df = spark.read.option("header",True) \
          .csv("/Users/admin/simple-zipcodes.csv")
df.printSchema()
df.show()

Yields below output.

pyspark sql

To use ANSI SQL query similar to RDBMS, you need to create a temporary table by reading the data from a CSV file. You can find this CSV file at Github project.


# Read CSV file into table
spark.read.option("header",True) \
          .csv("/Users/admin/simple-zipcodes.csv") \
          .createOrReplaceTempView("Zipcodes")

4.2 PySpark SQL to Select Columns

The select() function of DataFrame API is used to select the specific columns from the DataFrame.


# DataFrame API Select query
df.select("country","city","zipcode","state") \
     .show(5)

In SQL, you can achieve the same using SELECT FROM clause as shown below.


# SQL Select query
spark.sql("SELECT country, city, zipcode, state FROM ZIPCODES") \
     .show(5)

Both above examples yield the below output.

pyspark sql tutorial

4.3 Filter Rows

To filter the rows from the data, you can use where() function from the DataFrame API.


# DataFrame API where()
df.select("country","city","zipcode","state") \
  .where("state == 'AZ'") \
  .show(5)

Similarly, in SQL you can use WHERE clause as follows.


# SQL where
spark.sql(""" SELECT  country, city, zipcode, state FROM ZIPCODES 
          WHERE state = 'AZ' """) \
     .show(5)

Yields below output.

4.4 Sorting

To sort rows on a specific column use orderBy() function on DataFrame API.


# sorting
df.select("country","city","zipcode","state") \
  .where("state in ('PR','AZ','FL')") \
  .orderBy("state") \
  .show(10)

In SQL, you can achieve sorting by using ORDER BY clause.


# SQL ORDER BY
spark.sql(""" SELECT  country, city, zipcode, state FROM ZIPCODES 
          WHERE state in ('PR','AZ','FL') order by state """) \
     .show(10)

4.5 Grouping

The groupBy().count() is used to perform the group by on DataFrame.


# grouping
df.groupBy("state").count() \
  .show()

You can achieve group by in PySpark SQL is by using GROUP BY clause.


# SQL GROUP BY clause
spark.sql(""" SELECT state, count(*) as count FROM ZIPCODES 
          GROUP BY state""") \
     .show()

4.6 SQL Join Operations

Similarly, if you have two tables, you can perform the Join operations in PySpark.

4.7 Union

For unions refer to PySpark union examples.

5. Complete Example


# Import
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com') \
                    .getOrCreate()
                    
# Create DataFrame
df = spark.read \
          .option("header",True) \
          .csv("/Users/admin/simple-zipcodes.csv")
df.printSchema()
df.show()

# Create SQL table
spark.read \
          .option("header",True) \
          .csv("/Users/admin/simple-zipcodes.csv") \
          .createOrReplaceTempView("Zipcodes")
          
# Select query
df.select("country","city","zipcode","state") \
     .show(5)

spark.sql("SELECT  country, city, zipcode, state FROM ZIPCODES") \
     .show(5)
     
# where
df.select("country","city","zipcode","state") \
  .where("state == 'AZ'") \
  .show(5)

spark.sql(""" SELECT  country, city, zipcode, state FROM ZIPCODES 
          WHERE state = 'AZ' """) \
     .show(5)
     
# sorting
df.select("country","city","zipcode","state") \
  .where("state in ('PR','AZ','FL')") \
  .orderBy("state") \
  .show(10)
  
spark.sql(""" SELECT  country, city, zipcode, state FROM ZIPCODES 
          WHERE state in ('PR','AZ','FL') order by state """) \
     .show(10)

# grouping
df.groupBy("state").count() \
  .show()

spark.sql(""" SELECT state, count(*) as count FROM ZIPCODES 
          GROUP BY state""") \
     .show()

6. Conclusion

In this article, you have learned what is PySpark SQL module, its advantages, important classes from the module, and how to run SQL-like operations on DataFrame and on the temporary tables.

References

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium