You are currently viewing Spark Write DataFrame to Snowflake table
Photo by Dan LeFebvre on Unsplash

In this Snowflake tutorial, I will explain how to create a Snowflake database, write Spark DataFrame to Snowflake table, and understand different Snowflake options and saving modes using Scala language.

Advertisements

Pre-requisites

  • Snowflake data warehouse account
  • Basic understanding in Spark and IDE to run Spark programs

If you are reading this tutorial, I believe you already know what is Snowflake database is, in case if you are not aware, in simple terms Snowflake database is a purely cloud-based data storage and analytics data warehouse provided as a Software-as-a-Service (SaaS).

Snowflake database is architecture and designed an entirely new SQL database engine to work with cloud infrastructure wit out distracting from ANSI SQL standards hence, this is easy to learn if you have SQL background.

Maven Dependency

<dependency>
     <groupId>net.snowflake</groupId>
     <artifactId>spark-snowflake_2.11</artifactId>
     <version>2.5.9-spark_2.4</version>
</dependency>

Create a Snowflake Database & table

In order to create a Database, logon to Snowflake web console, select the Databases from the top menu and select “create a new database” option and finally enter the database name on the form and select “Finish” button.

To create a table you can use either Snowflake web console or use the below program to create.


val properties = new java.util.Properties()
  properties.put("user", "user")
  properties.put("password", "#########")
  properties.put("account", "oea82")
  properties.put("warehouse", "mywh")
  properties.put("db", "EMP")
  properties.put("schema", "public")
  properties.put("role","ACCOUNTADMIN")

  //JDBC connection string
  val jdbcUrl = "jdbc:snowflake://oea82.us-east-1.snowflakecomputing.com/"
  val connection = DriverManager.getConnection(jdbcUrl, properties)
  val statement = connection.createStatement
  statement.executeUpdate("create or replace table EMPLOYEE(name VARCHAR, department VARCHAR, salary number)")
  statement.close
  connection.close()

This Spark with Snowflake example is also available at GitHub project for reference.

Create a Spark DataFrame

First, let’s create a Spark DataFrame which we later write to Snowflake table. Here, “spark” is an object of SparkSession


  import spark.implicits._
  val simpleData = Seq(("James","Sales",3000),
    ("Michael","Sales",4600),
    ("Robert","Sales",4100),
    ("Maria","Finance",3000),
    ("Raman","Finance",3000),
    ("Scott","Finance",3300),
    ("Jen","Finance",3900),
    ("Jeff","Marketing",3000),
    ("Kumar","Marketing",2000)
  )
  val df = simpleData.toDF("name","department","salary")
  df.show()

Spark Connection parameters

To establish a connection from Spark to Snowflake account and database, we need to provide the following connection properties using Spark options.

  • sfURL : URL of your account for e.g https://oea82.us-east-1.snowflakecomputing.com/
  • sfAccount : You account name, you can get this from URL for e.g “oea82”
  • sfUser : Snowflake user name, typically your login user
  • sfPassword : user password
  • sfWarehouse : Snowflake Data Warehouse name
  • sfDatabase : Snowflake Database name
  • sfSchema : Database schema where your table belongs
  • sfRole : Snowflake user role
  • and more

Write Spark DataFrame to Snowflake table

By using the write() method (which is DataFrameWriter object) of the DataFrame and using the below operations, you can write the Spark DataFrame to Snowflake table.

Use format() to specify the data source name either snowflake or net.snowflake.spark.snowflake

Use Option() to specify the above-discussed connection parameters like URL, account, username, password, database name, schema, role and more.

Use dbtable option to specify the Snowflake table name you wanted to write to

Use mode() to specify if you wanted to overwrite, append, or ignore if the file already present.


  var sfOptions = Map(
    "sfURL" -> "https://oea82.us-east-1.snowflakecomputing.com/",
    "sfAccount" -> "oea82",
    "sfUser" -> "user",
    "sfPassword" -> "####################",
    "sfDatabase" -> "EMP",
    "sfSchema" -> "PUBLIC",
    "sfRole" -> "ACCOUNTADMIN"
  )

  df.write
    .format("snowflake")
    .options(sfOptions)
    .option("dbtable", "EMPLOYEE")
    .mode(SaveMode.Overwrite)
    .save()

}

When column names are different

When your column names do not match between Spark DataFrame schema and Snowflake table-use <strong>columnmap</strong> options with a parameter as a single string literal.


.option("columnmap", "Map(col_2 -> col_b, col_3 -> col_a)")

Saving Modes

Spark DataFrameWriter provides method mode() to specify SaveMode; the argument to this method either takes below string or a constant from SaveMode class.

overwrite – mode is used to overwrite the existing file, alternatively, you can use SaveMode.Overwrite.

append – To add the data to the existing file, alternatively, you can use SaveMode.Append.

ignore – Ignores write operation when the file already exists, alternatively you can use SaveMode.Ignore.

errorifexists or error – This is a default option when the file already exists, it returns an error, alternatively, you can use SaveMode.ErrorIfExists.

Spark Write DataFrame to Snowflake table complete example


package com.sparkbyexamples.spark

import org.apache.spark.sql.{SaveMode, SparkSession}

object WriteEmpDataFrameToSnowflake extends App {

  val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate();

  spark.sparkContext.setLogLevel("ERROR")

  import spark.implicits._

  val simpleData = Seq(("James","Sales",3000),
    ("Michael","Sales",4600),
    ("Robert","Sales",4100),
    ("Maria","Finance",3000),
    ("Raman","Finance",3000),
    ("Scott","Finance",3300),
    ("Jen","Finance",3900),
    ("Jeff","Marketing",3000),
    ("Kumar","Marketing",2000)
  )
  val df = simpleData.toDF("name","department","salary")
  df.show()

  var sfOptions = Map(
    "sfURL" -> "https://oea82.us-east-1.snowflakecomputing.com/",
    "sfAccount" -> "oea82",
    "sfUser" -> "user",
    "sfPassword" -> "####################",
    "sfDatabase" -> "EMP",
    "sfSchema" -> "PUBLIC",
    "sfRole" -> "ACCOUNTADMIN"
  )

  df.write
    .format("snowflake")
    .options(sfOptions)
    .option("dbtable", "EMPLOYEE")
    .mode(SaveMode.Overwrite)
    .save()

}

This Spark Snowflake connector scala example is also available at GitHub project WriteEmpDataFrameToSnowflake.scala for reference

Confluence

In this tutorial, you have learned how to create a Snowflake database, table, how to write Spark DataFrame to Snowflake table and finally learned different available writing modes.

Happy Learning !!