Spark with SQL Server – Read and Write Table

Can we connect to SQL Server (mssql) from Spark and read the table into Spark DataFrame and write the DataFrame to the SQL table?

In order to connect to SQL-server (mssql) from Apache Spark, you would need the following. Make sure you have these details before you read or write to the SQL server. The driver I am going to use in this article is com.microsoft.sqlserver.jdbc.spark

  • Driver to use (I will provide this)
  • SQL server address & port
  • Database name
  • Table name
  • User name and
  • Password

Steps to connect Spark to SQL Server and Read and write Table.

  • Step 1 – Identify the Spark SQL Connector version to use
  • Step 2 – Add the dependency
  • Step 3 – Create SparkSession & Dataframe
  • Step 4 – Save Spark DataFrame to SQL Server Table
  • Step 5 – Read SQL Table to Spark Dataframe

1. Spark Connector for SQL Server (mssql)

Apache Spark connector for SQL server works with both

  • SQL Server on-prem
  • Azure SQL

Following are the available connectors, based on the Spark version you are using, you should choose the appropriate version.

Spark VersionMaven dependency
Spark 2.4.x groupeId : com.microsoft.azure
artifactId : spark-mssql-connector
version : 1.0.2
Spark 3.0.x groupeId : com.microsoft.azure
artifactId : spark-mssql-connector_2.12
version : 1.1.0
Spark 3.1.x groupeId : com.microsoft.azure
spark-mssql-connector_2.12
version : 1.2.0
Spark version with SQL Server compatible connectors

You can find the all available Spark Microsoft SQL (Mssql) connectors at https://search.maven.org/search?q=spark-mssql-connector

2. Maven Dependency

If you are using Maven, add the below dependency to your pom.xml. For my example, I am using Spark 3.0 and Scala 2.12 hence I chose the following dependency.


<dependency>
  <groupId>com.microsoft.azure</groupId>
  <artifactId>spark-mssql-connector_2.12_3.0</artifactId>
  <version>1.0.0-alpha</version>
</dependency>

3. Create Spark Session & DataFrame

Creating a SparkSession is a basic step to work with Spark hence, first, let’s create a SparkSession and construct a sample DataFrame with columns id, name, age and gender. In the below sections, I will use this DataFrame to write into SQL Server table and read from it.


import org.apache.spark.sql.SparkSession

// Create SparkSession in spark 2.x or later
val spark = SparkSession.builder().master("local[*]")
    .appName("SparkByExamples.com")
    .getOrCreate()

import spark.implicits._

// Create DataFrame
val sampleDF = Seq((1, "James",30,"M"),
    (2, "Ann",40,"F"), (3, "Jeff",41,"M"),
    (4, "Jennifer",20,"F")
  ).toDF("id", "name","age","gender")

4. Write Spark DataFrame to SQL Server Table

You would be required to provide the SQL Server details while writing DataFrame to SQL table. By using the format() specify the driver, this driver is defined in the connector dependency.

Some points to note while writing.

  • The mode("overwrite") drops the table if already exists by default and re-creates a new one without indexes.
  • Use option("truncate","true") to retain the index.
  • This connector by default uses READ_COMMITTED isolation level. You can change this using option("mssqlIsolationLevel", "READ_UNCOMMITTED").

// Write to SQL Table
sampleDF.write
  .format("com.microsoft.sqlserver.jdbc.spark")
  .mode("overwrite")
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;")
  .option("dbtable", "employee")
  .option("user", "replace_user_name")
  .option("password", "replace_password")
  .save()

Here, used mode("overwrite") means if the table already exists with rows, overwrite the table with the rows from the DataFrame. The overwrite mode first drops the table if it already exists in the database.

5. Read SQL Server Table to Spark DataFrame

Similar to writing, with read() also you need to provide the driver and the SQL connection details. In the below example, I am reading a table employee from the database emp to the DataFrame.


// Read from SQL Table
val df = spark.read
  .format("com.microsoft.sqlserver.jdbc.spark")
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;")
  .option("dbtable", "employee")
  .option("user", "replace_user_name")
  .option("password", "replace_password")
  .load()

df.show()

Yields below output.

spark sql server read write

6. Select Specific Columns to Read

In the above example, it reads the entire table into Spark DataFrame. Sometimes you may not be required to select the entire table, so to select the specific columns, specify the query you wanted to select with dbtable option.


// Read from SQL Table
val df = spark.read
  .format("com.microsoft.sqlserver.jdbc.spark")
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;")
  .option("dbtable", "select id,age from employee where gender='M'")
  .option("user", "replace_user_name")
  .option("password", "replace_password")
  .load()

df.show()

7. Append Table

Use spark.write.mode("append") to append the rows to the existing SQL Server table.


// Write to SQL Table
sampleDF.write
  .format("com.microsoft.sqlserver.jdbc.spark")
  .mode("append")
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;")
  .option("dbtable", "employee")
  .option("user", "replace_user_name")
  .option("password", "replace_password")
  .save()

8. Complete Example

Following is the complete example of how to connect to SQL server or Azure SQL from Spark and read/write the table.


import org.apache.spark.sql.SparkSession

object ReadSQLTable extends App{

  // Create SparkSession
  val spark = SparkSession.builder().master("local[*]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq((1, "James",30,"M"),
    (2, "Ann",40,"F"), (3, "Jeff",41,"M"),
    (4, "Jennifer",20,"F")
  ).toDF("id", "name","age","gender")

  // Write to SQL Table
  sampleDF.write
  .format("com.microsoft.sqlserver.jdbc.spark")
  .mode("overwrite")
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;")
  .option("dbtable", "employee")
  .option("user", "replace_user_name")
  .option("password", "replace_password")
  .save()

  // Read from SQL Table
  val df = spark.read
  .format("com.microsoft.sqlserver.jdbc.spark")
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;")
  .option("dbtable", "employee")
  .option("user", "replace_user_name")
  .option("password", "replace_password").load()

  // Show sample records from data frame
  df.show(5)
}

Conclusion

In this article, you have learned how to connect to an SQL server from Spark and write the DataFrame to SQL table and read the table into DataFrame with examples.

Related Articles

References

Leave a Reply

This Post Has 4 Comments

  1. Anonymous

    How to execute an update statement of sql in spark?

    1. NNK

      You can’t update tables in Apache Spark. However, if you are using delta lake, you can perform all ACID operations.

  2. Anonymous

    How to execute an update statement of sql in spark?

    1. NNK

      You can’t update tables in Apache Spark. However, if you are using delta lake, you can perform all ACID operations.