PySpark Read JDBC Table to DataFrame

  • Post author:
  • Post category:PySpark
  • Post last modified:December 11, 2022

pyspark.sql.DataFrameReader.jdbc() is used to read a JDBC table to PySpark DataFrame. The usage would be SparkSession.read.jdbc(), here, read is an object of DataFrameReader class and jdbc() is a method in it.

In this article, I will explain the syntax of jdbc() methods (multiple variations), how to connect to the MySQL database, and reading a JDBC table to PySpark DataFrame by using PySpark with MySQL connector.

Steps to use pyspark.read.jdbc().

  • Step 1 – Identify the JDBC Connector to use
  • Step 2 – Add the dependency
  • Step 3 – Create SparkSession with database dependency
  • Step 4 – Read JDBC Table to PySpark Dataframe

1. Syntax of PySpark jdbc()

The DataFrameReader provides several syntaxes of the jdbc() method. You can use any of these based on your need.


# Syntax of jdbc()
jdbc(self, url: str, table: str, *, properties: Optional[Dict[str, str]])
jdbc(self,url: str,table: str,*,predicates: List[str],properties: Optional[Dict[str, str]] = None,)
jdbc(self,url: str,table: str,column: str,lowerBound: Union[int, str],upperBound: Union[int, str],numPartitions: int,*,properties: Optional[Dict[str, str]] = None)

2. PySpark Read JDBC

To read a table using jdbc() method, you would minimum need a driver, server ip, port, database name, table, user, and password. JDBC is a Java standard to connect to any database as long as you provide the right JDBC connector jar in the classpath and provide a JDBC driver using the JDBC API. PySpark also leverages the same JDBC standard when using jdbc() method.

The connector I am using in this article is mysql-connector-java.jar and the driver I am using com.mysql.jdbc.Driver

MySQL provides connectors for each server version hence, please choose the right version based pm server version you use. Download the mysql-connector-java-8.0.13.jar and keep it in your current directory.

3 PySpark Read jdbc() Example

I have MySQL database emp and table employee with columns id, name, age and gender. I will use this table to read using jdbc() into PySpark DataFrame.


# Imports
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
           .appName('SparkByExamples.com') \
           .config("spark.jars", "mysql-connector-java-8.0.13.jar") \
           .getOrCreate()

# Read table using jdbc()
df = spark.read \
    .jdbc("jdbc:mysql://localhost:3306/emp", "employee", \
          properties={"user": "root", "password": "root", "driver":"com.mysql.cj.jdbc.Driver"})

# show DataFrame
df.show()

Yields below output. For more JDBC properties refer to https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

pyspark read jdbc

Alternatively, you can also use the spark.read.format("jdbc").load() to read the table. When you use this, you need to provide the database details with option() method.


# Read from MySQL Table
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

4. Select Specific Columns to Read

In the above example, it reads the entire table into PySpark DataFrame. Sometimes you may not be required to select the entire table, so to select the specific columns, specify the query you wanted to select with dbtable option.


# Read from MySQL Table
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "select id,age from employee where gender='M'") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

df.show()

Alternatively, you can also use the query option. Note that you can use either dbtable or query option but not both at a time. Also, when using the query option, you can’t use partitionColumn option.


# Using query
df = spark.read \
  .format("jdbc") \
  ......
  ......
  .option("query", "select id,age from employee where gender='M'") \
  .......
  .......
  .load()

5. Read JDBC Table Parallel

Use option numPartitions to read JDBC table in parallel. This property also determines the maximum number of concurrent JDBC connections to use. The below example creates the DataFrame with 5 partitions. I have also used the fetchsize option which is used to specify how many rows to fetch at a time, by default it is 10.


# Using numPartitions
df = spark.read \
  .format("jdbc") \
  .option("query", "select id,age from employee where gender='M'") \
  .option("numPartitions",5) \
  .option("fetchsize", 20) \
  .......
  .......
  .load()

6. Complete Example

Following is the complete example of how to read a table using jdbc() method in PySpark.


# Imports
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
           .appName('SparkByExamples.com') \
           .config("spark.jars", "mysql-connector-java-8.0.13.jar") \
           .getOrCreate()

# Read table using jdbc()
df = spark.read \
    .jdbc("jdbc:mysql://localhost:3306/emp", "employee",
          properties={"user": "root", "password": "root", "driver":"com.mysql.cj.jdbc.Driver"})

# show DataFrame
df.show()

# Read from MySQL Table
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Conclusion

In this article, you have learned how to read a JDBC table using jdbc() method in PySpark. Also, learned how to connect to a MySQL and read the table to DataFrame.

References

Leave a Reply