Spark SQL is a very important and most used module that is used for structured data processing. Spark SQL allows you to query structured data using either SQL or DataFrame API.
1. Spark SQL Introduction
spark.sql is a module in Spark that is used to perform SQL-like operations on the data stored in memory. You can either leverage using programming API to query the data or use the ANSI SQL queries similar to RDBMS. You can also mix both, for example, use API on the result of an SQL query.
Following are the important classes from the SQL module.
- spark.sql.SparkSession – SparkSession is the main entry point for DataFrame and SQL functionality.
- spark.sql.DataFrame – DataFrame is a distributed collection of data organized into named columns.
- spark.sql.Column – A column expression in a DataFrame.
- spark.sql.Row – A row of data in a DataFrame.
- spark.sql.GroupedData – An object type that is returned by DataFrame.groupBy().
- spark.sql.DataFrameNaFunctions – Methods for handling missing data (null values).
- spark.sql.DataFrameStatFunctions – Methods for statistics functionality.
- spark.sql.functions – List of standard built-in functions.
- spark.sql.types – Available SQL data types in Spark.
- spark.sql.Window – Would be used to work with window functions.
Regardless of what approach you use, you have to create a SparkSession which is an entry point to the Spark application.
# Import SparkSession import org.apache.spark.sql.SparkSession # Create SparkSession val spark = SparkSession.builder().master("local") .appName("SparkByExamples.com") .getOrCreate()
2. Spark SQL DataFrame API
The Spark DataFrame definition is very well explained by Databricks hence I do not want to define it again and confuse you. Below is the definition I described in Databricks.
DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs.– Databricks
If you are coming from a Python background I would assume you already know what Pandas DataFrame is; Spark DataFrame is mostly similar to Pandas DataFrame with the exception Spark DataFrames are distributed in the cluster (meaning the data in DataFrame are stored in different machines in a cluster) and any operations in Spark executes in parallel on all machines whereas Panda Dataframe stores and operates on a single machine.
If you have no Python background, For now, just know that data in Spark DataFrames are stored in different machines in a cluster. When you run on a local laptop, it uses runs on your laptop.
3. Running SQL Queries in Spark
Spark SQL is one of the most used Spark modules which is used for processing structured columnar data format. Once you have a DataFrame created, you can interact with the data by using SQL syntax.
In other words, Spark SQL brings native RAW SQL queries on Spark meaning you can run traditional ANSI SQL on Spark Dataframe, in the SQL tutorial, you will learn in detail using SQL
In order to use SQL, first, create a temporary table on DataFrame using the createOrReplaceTempView() function. Once created, this table can be accessed throughout the SparkSession using
sql() and it will be dropped along with your SparkContext termination.
sql() method of the SparkSession object to run the query and this method returns a new DataFrame.
4. Spark SQL Examples
4.1 Create SQL View
Create a DataFrame from a CSV file. You can find this CSV file at Github project.
// Read CSV file into table val df = spark.read.option("header",true) .csv("/Users/admin/simple-zipcodes.csv") df.printSchema() df.show()
Yields below output.
To use ANSI SQL query similar to RDBMS, you need to create a temporary table by reading the data from a CSV file. You can find this CSV file at Github project.
// Read CSV file into table spark.read.option("header",true) .csv("/Users/admin/simple-zipcodes.csv") .createOrReplaceTempView("Zipcodes")
4.2 Spark SQL to Select Columns
The select() function of DataFrame API is used to select the specific columns from the DataFrame.
// DataFrame API Select query df.select("country","city","zipcode","state") .show(5)
In SQL, you can achieve the same using
SELECT FROM clause as shown below.
// SQL Select query spark.sql("SELECT country, city, zipcode, state FROM ZIPCODES") .show(5)
Both above examples yields the below output.
4.3 Filter Rows
To filter the rows from the data, you can use where() function from the DataFrame API.
// DataFrame API where() df.select("country","city","zipcode","state") .where("state == 'AZ'") .show(5)
Similarly, in SQL you can use
WHERE clause as follows.
// SQL where spark.sql(""" SELECT country, city, zipcode, state FROM ZIPCODES WHERE state = 'AZ' """) .show(5)
Yields below output.
To sort rows on a specific column use orderBy() function on DataFrame API.
// sorting df.select("country","city","zipcode","state") .where("state in ('PR','AZ','FL')") .orderBy("state") .show(10)
In SQL, you can achieve sorting by using
ORDER BY clause.
// SQL ORDER BY spark.sql(""" SELECT country, city, zipcode, state FROM ZIPCODES WHERE state in ('PR','AZ','FL') order by state """) .show(10)
The groupBy().count() is used to perform the group by on DataFrame.
// grouping df.groupBy("state").count() .show()
You can achieve group by in Spark SQL by using
GROUP BY clause.
// SQL GROUP BY clause spark.sql(""" SELECT state, count(*) as count FROM ZIPCODES GROUP BY state""") .show()
4.6 SQL Join Operations
Similarly, if you have two tables, you can perform the Join operations in Spark. Here is an example
// Join DataFrames empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner") .show(false)
In Spark SQL you can do it as below
// SQL Join spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") .show(false)
For unions refer to Spark union examples.
In this article, you have learned what is Spark SQL module, its advantages, important classes from the module, and how to run SQL-like operations on DataFrame and on the temporary tables.
- Spark SQL datediff()
- Spark Get the Current SparkContext Settings
- Spark SQL datediff()
- Spark SQL Create a Table
- Spark SQL like() Using Wildcard Example
- Spark SQL – Select Columns From DataFrame
- Spark SQL Inner Join with Example
- Spark SQL Self Join With Example