Site icon Spark By {Examples}

Read Snowflake table into Spark DataFrame

spark dataframe read snowflake table

Photo by Andreea Radu on Unsplash

In this Snowflake data warehouse article, I will explain how to read a Snowflake table into Spark DataFrame and learn different connection properties with the Scala language.

Pre-requisites

If you are reading this tutorial, I believe you already know what is Snowflake database, in case if you are not aware, in simple terms Snowflake database is a purely cloud-based data storage and analytics Data Warehouse provided as a Software-as-a-Service (SaaS).

Snowflake database is architecture and designed an entirely new SQL database engine to work with cloud infrastructure wit out distracting from ANSI SQL standards hence, this is easy to learn if you have SQL background.

If you do want to create a Snowflake table and insert some data, you can do this either from Snowflake web console or by following Writing Spark DataFrame to Snowflake table

Maven Dependency

<dependency>
     <groupId>net.snowflake</groupId>
     <artifactId>spark-snowflake_2.11</artifactId>
     <version>2.5.9-spark_2.4</version>
</dependency>

Spark Connection parameters

To establish a connection from Spark to Snowflake, we need to provide the following connection properties using Spark options.

Read Snowflake table into Spark DataFrame

By using the read() method (which is DataFrameReader object) of the SparkSession and using below methods

Use format() to specify the data source name either snowflake or net.snowflake.spark.snowflake

Use Option() to specify the above-discussed connection parameters like URL, account, username, password, database name, schema, role and more.

Use dbtable option to specify the Snowflake table name you wanted to read from or use query option to execute the specific query.


package com.sparkbyexamples.spark

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadEmpFromSnowflake extends App{

  val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate();

  var sfOptions = Map(
    "sfURL" -> "https://oea82.us-east-1.snowflakecomputing.com/",
    "sfAccount" -> "oea82",
    "sfUser" -> "user",
    "sfPassword" -> "#############",
    "sfDatabase" -> "EMP",
    "sfSchema" -> "PUBLIC",
    "sfRole" -> "ACCOUNTADMIN"
  )

  val df: DataFrame = spark.read
    .format("net.snowflake.spark.snowflake") // or just use "snowflake"
    .options(sfOptions)
    .option("dbtable", "EMPLOYEE")
    .load()

  df.show(false)
}

This yields below output


+-------+----------+------+
|NAME   |DEPARTMENT|SALARY|
+-------+----------+------+
|James  |Sales     |3000  |
|Michael|Sales     |4600  |
|Robert |Sales     |4100  |
|Maria  |Finance   |3000  |
|Raman  |Finance   |3000  |
|Scott  |Finance   |3300  |
|Jen    |Finance   |3900  |
|Jeff   |Marketing |3000  |
|Kumar  |Marketing |2000  |
+-------+----------+------+

Above example demonstrates reading the entire table from the Snowflake table using dbtable option and creating a Spark DataFrame, below example uses a query option to execute a group by aggregate SQL query.


  val df1: DataFrame = spark.read
    .format("net.snowflake.spark.snowflake")
    .options(sfOptions)
    .option("query", "select department, sum(salary) as total_salary from EMPLOYEE group by department")
    .load()
df1.show(false)

This yields the below output.


+----------+------------+
|DEPARTMENT|TOTAL_SALARY|
+----------+------------+
|Sales     |11700       |
|Finance   |13200       |
|Marketing |5000        |
+----------+------------+

This Spark Snowflake connector scala example is also available at GitHub project ReadEmpFromSnowflake

Complete example


package com.sparkbyexamples.spark

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadEmpFromSnowflake extends App{

  val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate();

  var sfOptions = Map(
    "sfURL" -> "https://oea81082.us-east-1.snowflakecomputing.com/",
    "sfAccount" -> "oea81082",
    "sfUser" -> "sfusername",
    "sfPassword" -> "#####1",
    "sfDatabase" -> "EMP",
    "sfSchema" -> "PUBLIC",
    "sfRole" -> "ACCOUNTADMIN"
  )

  val df: DataFrame = spark.read
    .format("net.snowflake.spark.snowflake")
    .options(sfOptions)
    .option("dbtable", "EMPLOYEE")
    .load()

  df.show(false)

  val df1: DataFrame = spark.read
    .format("net.snowflake.spark.snowflake")
    .options(sfOptions)
    .option("query", "select department, sum(salary) as total_salary from EMPLOYEE group by department")
    .load()

  df1.show(false)
}

This Spark Snowflake connector scala example is also available at GitHub project ReadEmpFromSnowflake

Confluence

In this tutorial, you have learned how to read a Snowflake table and write it to Spark DataFrame and also learned different options to use to connect to Snowflake table.

Happy Learning !!

Exit mobile version