Read Snowflake table into Spark DataFrame

In this Snowflake data warehouse article, I will explain how to read a Snowflake table into Spark DataFrame and learn different connection properties with the Scala language.

Pre-requisites

  • Snowflake data warehouse account
  • Basic understanding in Spark and IDE to run Spark programs

If you are reading this tutorial, I believe you already know what is Snowflake database, in case if you are not aware, in simple terms Snowflake database is a purely cloud-based data storage and analytics Data Warehouse provided as a Software-as-a-Service (SaaS).

Snowflake database is architecture and designed an entirely new SQL database engine to work with cloud infrastructure wit out distracting from ANSI SQL standards hence, this is easy to learn if you have SQL background.

If you do want to create a Snowflake table and insert some data, you can do this either from Snowflake web console or by following Writing Spark DataFrame to Snowflake table

Maven Dependency

<dependency>
     <groupId>net.snowflake</groupId>
     <artifactId>spark-snowflake_2.11</artifactId>
     <version>2.5.9-spark_2.4</version>
</dependency>

Spark Connection parameters

To establish a connection from Spark to Snowflake, we need to provide the following connection properties using Spark options.

  • sfURL : URL of your account for e.g https://oea82.us-east-1.snowflakecomputing.com/
  • sfAccount : You account name, you can get this from URL for e.g “oea82”
  • sfUser : Snowflake user name, typically your login user
  • sfPassword : user password
  • sfWarehouse : Snowflake Data warehouse name
  • sfDatabase : Snowflake Database name
  • sfSchema : Database schema where your table belongs
  • sfRole : Snowflake user role
  • and more

Read Snowflake table into Spark DataFrame

By using the read() method (which is DataFrameReader object) of the SparkSession and using below methods

Use format() to specify the data source name either snowflake or net.snowflake.spark.snowflake

Use Option() to specify the above-discussed connection parameters like URL, account, username, password, database name, schema, role and more.

Use dbtable option to specify the Snowflake table name you wanted to read from or use query option to execute the specific query.


package com.sparkbyexamples.spark

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadEmpFromSnowflake extends App{

  val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate();

  var sfOptions = Map(
    "sfURL" -> "https://oea82.us-east-1.snowflakecomputing.com/",
    "sfAccount" -> "oea82",
    "sfUser" -> "user",
    "sfPassword" -> "#############",
    "sfDatabase" -> "EMP",
    "sfSchema" -> "PUBLIC",
    "sfRole" -> "ACCOUNTADMIN"
  )

  val df: DataFrame = spark.read
    .format("net.snowflake.spark.snowflake") // or just use "snowflake"
    .options(sfOptions)
    .option("dbtable", "EMPLOYEE")
    .load()

  df.show(false)
}

This yields below output


+-------+----------+------+
|NAME   |DEPARTMENT|SALARY|
+-------+----------+------+
|James  |Sales     |3000  |
|Michael|Sales     |4600  |
|Robert |Sales     |4100  |
|Maria  |Finance   |3000  |
|Raman  |Finance   |3000  |
|Scott  |Finance   |3300  |
|Jen    |Finance   |3900  |
|Jeff   |Marketing |3000  |
|Kumar  |Marketing |2000  |
+-------+----------+------+

Above example demonstrates reading the entire table from the Snowflake table using dbtable option and creating a Spark DataFrame, below example uses a query option to execute a group by aggregate SQL query.


  val df1: DataFrame = spark.read
    .format("net.snowflake.spark.snowflake")
    .options(sfOptions)
    .option("query", "select department, sum(salary) as total_salary from EMPLOYEE group by department")
    .load()
df1.show(false)

This yields the below output.


+----------+------------+
|DEPARTMENT|TOTAL_SALARY|
+----------+------------+
|Sales     |11700       |
|Finance   |13200       |
|Marketing |5000        |
+----------+------------+

This Spark Snowflake connector scala example is also available at GitHub project ReadEmpFromSnowflake

Complete example


package com.sparkbyexamples.spark

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadEmpFromSnowflake extends App{

  val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate();

  var sfOptions = Map(
    "sfURL" -> "https://oea81082.us-east-1.snowflakecomputing.com/",
    "sfAccount" -> "oea81082",
    "sfUser" -> "nnkumar13",
    "sfPassword" -> "#####1",
    "sfDatabase" -> "EMP",
    "sfSchema" -> "PUBLIC",
    "sfRole" -> "ACCOUNTADMIN"
  )

  val df: DataFrame = spark.read
    .format("net.snowflake.spark.snowflake")
    .options(sfOptions)
    .option("dbtable", "EMPLOYEE")
    .load()

  df.show(false)

  val df1: DataFrame = spark.read
    .format("net.snowflake.spark.snowflake")
    .options(sfOptions)
    .option("query", "select department, sum(salary) as total_salary from EMPLOYEE group by department")
    .load()

  df1.show(false)
}

This Spark Snowflake connector scala example is also available at GitHub project ReadEmpFromSnowflake

Confluence

In this tutorial, you have learned how to read a Snowflake table and write it to Spark DataFrame and also learned different options to use to connect to Snowflake table.

Happy Learning !!

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply

You are currently viewing Read Snowflake table into Spark DataFrame
Photo by Andreea Radu on Unsplash