You are currently viewing Spark Read and Write MySQL Database Table

How to connect to MySQL database Server from Spark and read the table into Spark DataFrame and write the DataFrame to the table?

Steps to connect Spark to MySQL Server and Read and write Table.

  • Step 1 – Identify the Spark MySQL Connector version to use
  • Step 2 – Add the dependency
  • Step 3 – Create SparkSession & Dataframe
  • Step 4 – Save Spark DataFrame to MySQL Database Table
  • Step 5 – Read MySQL Table to Spark Dataframe

In order to connect to MySQL server from Apache Spark, you would need the following. Make sure you have these details before you read or write to the MySQL server.

  • MySQL server address & port
  • Database name
  • Table name
  • User name and
  • Password

1. Spark Connector for MySQL Server

In order to connect MySQL Server from Spark you would need JDBC connector and the Driver class. The connector I am using in this article is mysql-connector-java and the driver I am using com.mysql.cj.jdbc.Driver

MySQL provides connectors for each MySQL server version hence, please choose the right version to use based on your server version.

2. Maven Dependency

If you are using Maven, add the below dependencies to your pom.xml. For my example, I am using Spark 3.2.1 and Scala 2.13 hence I chose the following dependencies.


<dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.13</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.13</artifactId>
        <version>3.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.13</artifactId>
        <version>3.2.1</version>
    </dependency>
</dependencies>

If you get the error java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver when you run this code, you probably don’t have MySQL jar in classpath. Follow the below steps in IntelliJ to resolve this.

  • Copy the MySQL jar somewhere on your system. (you can get it from .m2 repository)
  • Go to File > Project Structure.
  • Select Modules at the left panel and select the Dependencies tab.
  • Select the + icon and select 1 JARs or Directories option.
  • select your JAR file or you can select the directories.
  • Click on the OK button.

3. Create Spark Session & DataFrame

Creating a SparkSession is a basic step to work with Spark hence, first, let’s create a SparkSession and construct a sample DataFrame with columns id, name, age and gender. In the below sections, I will use this DataFrame to write into a MySQL table and read from it.


import org.apache.spark.sql.SparkSession

// Create SparkSession in spark 2.x or later
val spark = SparkSession.builder().master("local[*]")
    .appName("SparkByExamples.com")
    .getOrCreate()

import spark.implicits._

// Create DataFrame
val sampleDF = Seq((1, "James",30,"M"),
    (2, "Ann",40,"F"), (3, "Jeff",41,"M"),
    (4, "Jennifer",20,"F")
  ).toDF("id", "name","age","gender")

4. Write Spark DataFrame to MySQL Database Table

You would be required to provide the MySQL server & database details while writing Spark DataFrame to the table. Use the format() to specify the driver, this driver class is defined in the MySQL connector dependency.

Some points to note while writing.

  • The mode("overwrite") drops the table if already exists by default and re-creates a new one without indexes.
  • Use option("truncate","true") to retain the index.
  • This connector by default uses READ_COMMITTED isolation level. You can change this using option("mssqlIsolationLevel", "READ_UNCOMMITTED").

// Write to MySQL Table
sampleDF.write
  .format("jdbc")
  .option("driver","com.mysql.cj.jdbc.Driver")
  .option("url", "jdbc:mysql://localhost:3306/database_name")
  .option("dbtable", "employee")
  .option("user", "root")
  .option("password", "root")
  .save()

Please check the database and confirm the table is created. If you get a database not exists error, create the database prior to running this code.

spark read mysql table

5. Read MySQL Database Table to Spark DataFrame

Similar to writing, with read() also you need to provide the driver and the MySQL connection details. In the below example, I am reading a table employee from the database emp to the DataFrame.


// Read from MySQL Table
val df = spark.read
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/database_name")
    .option("dbtable", "employee")
    .option("user", "root")
    .option("password", "root")
    .load()

Yields below output.

spark mysql read write

6. Select Specific Columns to Read

In the above example, it reads the entire table into Spark DataFrame. Sometimes you may not be required to select the entire table, so to select the specific columns, specify the query you wanted to select with dbtable option.


// Read from SQL Table
val df = spark.read
  .format("jdbc")
  .option("driver","com.mysql.cj.jdbc.Driver")
  .option("url", "jdbc:mysql://localhost:3306/database_name")
  .option("dbtable", "select id,age from employee where gender='M'")
  .option("user", "replace_user_name")
  .option("password", "replace_password")
  .load()

df.show()

Alternatively, you can also use the query option. Note that you can use either dbtable or query option but not both at a time. Also, when using the query option, you can’t use partitionColumn option.


// Using query
val df = spark.read 
  .format("jdbc")
  ......
  ......
  .option("query", "select id,age from employee where gender='M'")
  .......
  .......
  .load()

7. Read MySQL Table in Parallel

Use option numPartitions to read MySQL table in parallel. This property also determines the maximum number of concurrent JDBC connections to use. The below example creates the DataFrame with 5 partitions. I have also used the fetchsize option which is used to specify how many rows to fetch at a time, by default it is set to 10.


// Using numPartitions
val df = spark.read 
  .format("jdbc") 
  .option("query", "select id,age from employee where gender='M'")
  .option("numPartitions",5)
  .option("fetchsize", 20)
  .......
  .......
  .load()

8. Append Table

Use spark.write.mode("append") to append the rows to the existing MySQL database table.


// Write to SQL Table
sampleDF.write
  .format("jdbc")
  .option("driver","com.mysql.cj.jdbc.Driver")
  .option("url", "jdbc:mysql://localhost:3306/database_name")
  .option("dbtable", "employee")
  .option("user", "root")
  .option("password", "root")
  .save()

9. Spark Shell MySQL Connector

Sometimes you may be required to connect to MySQL from the Spark shell interactive mode to test some queries, you can achieve this by providing MySQL connector library to spark-shell. once the shell started, you can run your queries.


bin/spark-shell \
      --master local[*] \
      --jars /path/to/mysql/connector/mysql-connector-java-8.0.13.jar

10. Spark Submit MySQL Connector

Similarly, you also need to add the MySQL connector jar to the –jar with spark-submit. If you have multiple jars refer to how to add multiple jars to spark-submit. With this command


bin/spark-submit \
      --master yarn \
      --jars /path/to/mysql/connector/mysql-connector-java-8.0.13.jar
      .........
      .........
      .........

11. Complete Example

Following is the complete example of how to connect to MySQL server from Spark and read/write the table.


import org.apache.spark.sql.SparkSession

object ReadMysqlServerTable extends App{

  // Create SparkSession
  val spark = SparkSession.builder().master("local[*]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  import spark.implicits._

  // Create DataFrame
  val sampleDF = Seq((1, "James",30,"M"),
    (2, "Ann",40,"F"), (3, "Jeff",41,"M"),
    (4, "Jennifer",20,"F")
  ).toDF("id", "name","age","gender")

  // Write to SQL Table
  sampleDF.write
    .format("jdbc")
    .mode("overwrite")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/database_name")
    .option("dbtable", "employee")
    .option("user", "root")
    .option("password", "root")
    .save()

  // Read from SQL Table
  val df = spark.read
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/database_name")
    .option("dbtable", "employee")
    .option("user", "root")
    .option("password", "root")
    .load()

  // Show sample records from data frame
  df.show(5)
}

As I explained above, if you get the error java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver when you run this code, you probably don’t have MySQL jar in classpath. Follow the below steps in IntelliJ to resolve this.

  • Copy the MySQL jar somewhere on your system. (you can get it from .m2 repository)
  • Go to File > Project Structure.
  • Select Modules at the left panel and select the Dependencies tab.
  • Select the + icon and select 1 JARs or Directories option.
  • select your JAR file or you can select the directories.
  • Click on the OK button.

Conclusion

In this article, you have learned how to connect to a MySQL server from Spark and write the DataFrame to a table and read the table into DataFrame with examples.

Related Articles

References

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium