In this Snowflake data warehouse article, I will explain how to read a Snowflake table into Spark DataFrame and learn different connection properties with the Scala language.
Pre-requisites
- Snowflake data warehouse account
- Basic understanding in Spark and IDE to run Spark programs
If you are reading this tutorial, I believe you already know what is Snowflake database, in case if you are not aware, in simple terms Snowflake database is a purely cloud-based data storage and analytics Data Warehouse provided as a Software-as-a-Service (SaaS).
Snowflake database is architecture and designed an entirely new SQL database engine to work with cloud infrastructure wit out distracting from ANSI SQL standards hence, this is easy to learn if you have SQL background.
If you do want to create a Snowflake table and insert some data, you can do this either from Snowflake web console or by following Writing Spark DataFrame to Snowflake table
Maven Dependency
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>spark-snowflake_2.11</artifactId>
<version>2.5.9-spark_2.4</version>
</dependency>
Spark Connection parameters
To establish a connection from Spark to Snowflake, we need to provide the following connection properties using Spark options.
sfURL
: URL of your account for e.g https://oea82.us-east-1.snowflakecomputing.com/sfAccount
: You account name, you can get this from URL for e.g “oea82”sfUser
: Snowflake user name, typically your login usersfPassword
: user passwordsfWarehouse
: Snowflake Data warehouse namesfDatabase
: Snowflake Database namesfSchema
: Database schema where your table belongssfRole
: Snowflake user role- and more
Read Snowflake table into Spark DataFrame
By using the read()
method (which is DataFrameReader
object) of the SparkSession and using below methods
Use format()
to specify the data source name either snowflake
or net.snowflake.spark.snowflake
Use Option()
to specify the above-discussed connection parameters like URL, account, username, password, database name, schema, role and more.
Use dbtable
option to specify the Snowflake table name you wanted to read from or use query
option to execute the specific query.
package com.sparkbyexamples.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
object ReadEmpFromSnowflake extends App{
val spark = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate();
var sfOptions = Map(
"sfURL" -> "https://oea82.us-east-1.snowflakecomputing.com/",
"sfAccount" -> "oea82",
"sfUser" -> "user",
"sfPassword" -> "#############",
"sfDatabase" -> "EMP",
"sfSchema" -> "PUBLIC",
"sfRole" -> "ACCOUNTADMIN"
)
val df: DataFrame = spark.read
.format("net.snowflake.spark.snowflake") // or just use "snowflake"
.options(sfOptions)
.option("dbtable", "EMPLOYEE")
.load()
df.show(false)
}
This yields below output
+-------+----------+------+
|NAME |DEPARTMENT|SALARY|
+-------+----------+------+
|James |Sales |3000 |
|Michael|Sales |4600 |
|Robert |Sales |4100 |
|Maria |Finance |3000 |
|Raman |Finance |3000 |
|Scott |Finance |3300 |
|Jen |Finance |3900 |
|Jeff |Marketing |3000 |
|Kumar |Marketing |2000 |
+-------+----------+------+
Above example demonstrates reading the entire table from the Snowflake table using dbtable
option and creating a Spark DataFrame, below example uses a query
option to execute a group by aggregate SQL query.
val df1: DataFrame = spark.read
.format("net.snowflake.spark.snowflake")
.options(sfOptions)
.option("query", "select department, sum(salary) as total_salary from EMPLOYEE group by department")
.load()
df1.show(false)
This yields the below output.
+----------+------------+
|DEPARTMENT|TOTAL_SALARY|
+----------+------------+
|Sales |11700 |
|Finance |13200 |
|Marketing |5000 |
+----------+------------+
This Spark Snowflake connector scala example is also available at GitHub project ReadEmpFromSnowflake
Complete example
package com.sparkbyexamples.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
object ReadEmpFromSnowflake extends App{
val spark = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate();
var sfOptions = Map(
"sfURL" -> "https://oea81082.us-east-1.snowflakecomputing.com/",
"sfAccount" -> "oea81082",
"sfUser" -> "sfusername",
"sfPassword" -> "#####1",
"sfDatabase" -> "EMP",
"sfSchema" -> "PUBLIC",
"sfRole" -> "ACCOUNTADMIN"
)
val df: DataFrame = spark.read
.format("net.snowflake.spark.snowflake")
.options(sfOptions)
.option("dbtable", "EMPLOYEE")
.load()
df.show(false)
val df1: DataFrame = spark.read
.format("net.snowflake.spark.snowflake")
.options(sfOptions)
.option("query", "select department, sum(salary) as total_salary from EMPLOYEE group by department")
.load()
df1.show(false)
}
This Spark Snowflake connector scala example is also available at GitHub project ReadEmpFromSnowflake
Confluence
In this tutorial, you have learned how to read a Snowflake table and write it to Spark DataFrame and also learned different options to use to connect to Snowflake table.
Happy Learning !!