PySpark Query Database Table using JDBC

  • Post author:
  • Post category:PySpark
  • Post last modified:December 13, 2022

How to perform a SQL query on a database table by using JDBC in PySpark? In order to query the database table using jdbc() you need to have a database server running, the database java connector, and connection details.

By using an option dbtable or query with jdbc() method you can do the SQL query on the database table into PySpark DataFrame.

Steps to query the database table using JDBC

  • Step 1 – Identify the Database Java Connector version to use
  • Step 2 – Add the dependency
  • Step 3 – Query JDBC Table to PySpark Dataframe

1. PySpark Query JDBC Database Table

To query a database table using jdbc() method, you would need the following.

  • Server IP or Host name and Port,
  • Database name,
  • Table name,
  • User and Password.

JDBC is a Java standard to connect to any database as long as you provide the right JDBC connector jar in the classpath and provide a JDBC driver using the JDBC API. PySpark also leverages the same JDBC standard when using jdbc() method.

The connector I am using in this article is mysql-connector-java-<version>.jar and the driver I am using com.mysql.jdbc.Driver

MySQL provides connectors for each server version hence, please choose the right version based pm server version you use. Download the mysql-connector-java-8.0.13.jar and keep it in your current directory.

2 PySpark Query JDBC Table Example

I have MySQL database emp and table employee with columns id, name, age and gender.

pyspark query table jdbc

I will use this JDBC table to run SQL queries and store the output in PySpark DataFrame. The below example extracts the complete table into DataFrame


# Imports
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
           .appName('SparkByExamples.com') \
           .config("spark.jars", "mysql-connector-java-8.0.13.jar") \
           .getOrCreate()

# Query table using jdbc()
df = spark.read \
    .jdbc("jdbc:mysql://localhost:3306/emp", "employee", \
          properties={"user": "root", "password": "root", "driver":"com.mysql.cj.jdbc.Driver"})

# show DataFrame
df.show()

Yields below output. For more JDBC properties refer to https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

pyspark query jdbc table

Alternatively, you can also use the DataFrameReader.format("jdbc").load() to query the table. When you use this, you need to provide the database details with the option() method.


# Query from MySQL Table
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

3. SQL Query Specific Columns of JDBC Table

In the above example, it extracts the entire JDBC table into PySpark DataFrame. Sometimes you may be required to query specific columns with where condition. You can achieve this by using either dbtable or query options.


# Query from MySQL Table
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("query", "select id,age from employee where gender='M'") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

df.show()

Alternatively, you can also use the query option. Note that you can use either dbtable or query option but not both at a time. Also, when using the query option, you can’t use partitionColumn option.


# Using query
df = spark.read \
  .format("jdbc") \
  ......
  ......
  .option("query", "select id,age from employee where gender='M'") \
  .......
  .......
  .load()

4. Query JDBC Table Parallel

Use option numPartitions to query JDBC table in parallel. This property also determines the maximum number of concurrent JDBC connections to use. The below example creates the DataFrame with 5 partitions. I have also used the fetchsize option which is used to specify how many rows to fetch at a time, by default it is 10.


# Using numPartitions
df = spark.read \
  .format("jdbc") \
  .option("query", "select id,age from employee where gender='M'") \
  .option("numPartitions",5) \
  .option("fetchsize", 20) \
  .......
  .......
  .load()

5. Complete Example

Following is the complete example of how to query a database table using jdbc() method in PySpark.


# Imports
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
           .appName('SparkByExamples.com') \
           .config("spark.jars", "mysql-connector-java-8.0.13.jar") \
           .getOrCreate()

# Query table using jdbc()
df = spark.read \
    .jdbc("jdbc:mysql://localhost:3306/emp", "employee",
          properties={"user": "root", "password": "root", "driver":"com.mysql.cj.jdbc.Driver"})

# show DataFrame
df.show()

# Query from MySQL Table
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

# Query from MySQL Table
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("query", "select id,age from employee where gender='M'") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

df.show()

Conclusion

In this article, you have learned how to SQL query a database table using jdbc() method in PySpark. Also, learned how to query the specific columns with where condition.

References

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply