Spark JDBC Parallel Read

  • Post author:
  • Post category:Apache Spark
  • Post last modified:December 13, 2022

By using the Spark jdbc() method with the option numPartitions you can read the database table in parallel. This option is used with both reading and writing. Apache spark document describes the option numPartitions as follows.

The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.

spark.org

In my previous article, I explained different options with Spark Read JDBC. In this article, I will explain how to load the JDBC table in parallel by connecting to the MySQL database. I have a database emp and table employee with columns id, name, age and gender. For a complete example with MySQL refer to how to use MySQL to Read and Write Spark DataFrame

spark parallel read jdbc

1. Parallel Read JDBC in Spark

I will use the jdbc() method and option numPartitions to read this table in parallel into Spark DataFrame. This property also determines the maximum number of concurrent JDBC connections to use. The below example creates the DataFrame with 5 partitions.


import org.apache.spark.sql.SparkSession
import java.util.Properties

// Create SparkSession in spark 2.x or later
val spark = SparkSession.builder().master("local[*]")
    .appName("SparkByExamples.com")
    .getOrCreate()

// Connection Properties
val connProp = new Properties()
connProp.setProperty("driver", "com.mysql.cj.jdbc.Driver");
connProp.put("user", "root")
connProp.put("numPartitions",5)
connProp.put("password", "root")

// Read JDBC table in parallel
val df = spark.read
  .jdbc("jdbc:mysql://localhost:3306/emp", "employee", connProp)

// show DataFrame
df.show()

Yields below output.

spark parallel read jdbc

2. Another Approach to Read JDBC in Parallel

Alternatively, you can also use the spark.read.format("jdbc").load() to read the table. When you use this, you need to provide the database details with option() method.


// Read Table in Parallel
val df = spark.read
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/emp")
    .option("dbtable","employee")
    .option("numPartitions",5)
    .option("user", "root")
    .option("password", "root")
    .load()

You can also select the specific columns with where condition by using the query option. Note that you can use either dbtable or query option but not both at a time. Also, when using the query option, you can’t use partitionColumn option.


// Select columns with where clause
val df = spark.read
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/emp")
    .option("query","select id,age from employee where gender='M'")
    .option("numPartitions",5)
    .option("user", "root")
    .option("password", "root")
    .load()

3. Using fetchsize with numPartitions to Read

The fetchsize is another option which is used to specify how many rows to fetch at a time, by default it is set to 10. The JDBC fetch size determines how many rows to retrieve per round trip which helps the performance of JDBC drivers.  Do not set this to very large number as you might see issues.


// Using fetchsize
val df = spark.read
    .format("jdbc")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("url", "jdbc:mysql://localhost:3306/emp")
    .option("query","select id,age from employee where gender='M'")
    .option("numPartitions",5)
    .option("fetchsize", 20)
    .option("user", "root")
    .option("password", "root")
    .load()

4. Other JDBC Options to read in Parallel

Note that when one option from the below table is specified you need to specify all of them along with numPartitions.

They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric, date, or timestamp column from the table in question

JDBC OptionDescription
partitionColumnPartition column to specify
lowerBoundLower Bound
upperBoundUpper Bound

Conclusion

In this article, you have learned how to read the table in parallel by using numPartitions option of Spark jdbc(). This property also determines the maximum number of concurrent JDBC connections to use.

Related Articles

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply