• Post author:
  • Post category:PySpark
  • Post last modified:April 30, 2024
  • Reading time:13 mins read
You are currently viewing PySpark Read and Write MySQL Database Table

In today’s data-driven world, data engineers and analysts must be able to efficiently interact with diverse data sources. PySpark, a powerful Python API for Apache Spark, offers seamless integration with various data storage systems, including relational databases like MySQL.

Advertisements

In this article, I will cover step-by-step instructions on how to connect to the MySQL database, read the table into a PySpark/Spark DataFrame, and write the DataFrame back to the MySQL table.

To connect to the MySQL server from PySpark, you would need the following details: Ensure you have these details before reading or writing to the MySQL server.

  • MySQL server address & port
  • Database name
  • Table name
  • User name and
  • Password

1. MySQL Connector for PySpark

You’ll need the MySQL connector to work with the MySQL database; hence, first download the connector. also, you would need database details such as the driver, server IP, port, database name, table name, user credentials, and potentially the password.

PySpark uses JDBC, a Java standard, to facilitate connections to various databases. It incorporates the appropriate JDBC connector JAR in the classpath and utilizes a JDBC driver via the JDBC API.

In this article, I’m utilizing the mysql-connector-java.jar connector and the com.mysql.jdbc.Driver driver. MySQL offers connectors tailored to each server version, so it’s crucial to select the appropriate version corresponding to your server.

Please download the mysql-connector-java-8.0.13.jar and ensure it’s stored in your current directory.

First, create a SparkSession by specifying the MySQL connector jar file. By specifying the JAR file, Spark ensures its availability across cluster nodes, enabling seamless connectivity to MySQL databases.

Construct a sample DataFrame with columns id, name, age and gender.


# Imports
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
           .appName('SparkByExamples.com') \
           .config("spark.jars", "mysql-connector-java-8.0.13.jar")
           .getOrCreate()

# Create DataFrame 
columns = ["id", "name","age","gender"]
data = [(1, "James",30,"M"), (2, "Ann",40,"F"),
    (3, "Jeff",41,"M"),(4, "Jennifer",20,"F")]

sampleDF = spark.sparkContext.parallelize(data).toDF(columns)

3. Write PySpark DataFrame to MySQL Database Table

PySpark enables seamless data transfer from Spark DataFrames into MySQL tables. Whether you’re performing data transformations, aggregations, or analyses, By specifying the target MySQL table, mode of operation (e.g., append, overwrite), and connection properties, PySpark handles the data insertion process smoothly.

Use the DataFrameWriter.format() to write the DataFrame to MySQL table.

Some points to note while writing

  • To re-write the existing table, use the mode("overwrite"). This drops the table if already exists by default and re-creates a new one without indexes.
  • To retain the indexes, use option("truncate","true").
  • By default, the connector uses READ_COMMITTED isolation level. To change this use option("mssqlIsolationLevel", "READ_UNCOMMITTED").
  • The dbtable option is used in PySpark to specify the name of the table in a database that you want to read data from or write data to.

# Write to MySQL Table
sampleDF.write \
  .format("jdbc") \
  .option("driver","com.mysql.cj.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/emp") \
  .option("dbtable", "employee") \
  .option("user", "root") \
  .option("password", "root") \
  .save()

As specified above, the overwrite mode first drops the table if it already exists in the database.

pyspark mysql read write

4. Read MySQL Database Table to PySpark DataFrame

Using PySpark’s JDBC connector, you can easily fetch data from MySQL tables into Spark DataFrames. This allows for efficient parallelized processing of large datasets residing in MySQL databases. By specifying the JDBC URL, table name, and appropriate connection properties, PySpark can establish a connection to the MySQL server and ingest data with ease.

In the below example, I am reading a table employee from the database emp to the DataFrame.


# Read from MySQL Table
val df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Yields below output. Alternatively, you can also use spark.read.jdbc() in PySpark

pyspark mysql read write

5. Select Specific Columns to Read

To read only specifc columns from the table, specify the SQL query to the dbtable option.


# Read from SQL Table
df = spark.read \ 
  .format("jdbc") \
  .option("driver","com.mysql.cj.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/emp") \
  .option("dbtable", "select id,age from employee where gender='M'") \
  .option("user", "root") \
  .option("password", "root") \
  .load()

df.show()

Alternatively, you can also use the query option. Note that you can use either dbtable or query option but not both at a time. Also, when using the query option, you can’t use partitionColumn option.


#Using query
df = spark.read \
  .format("jdbc") \
  ......
  ......
  .option("query", "select id,age from employee where gender='M'") \
  .......
  .......
  .load()

6. Read MySQL Table in Parallel

Utilize the numPartitions option to parallelize the reading of a MySQL table. This setting also dictates the maximum count of simultaneous JDBC connections to employ. In the following example, a DataFrame with 5 partitions is created. Additionally, the fetchsize option is employed, specifying the number of rows to fetch per iteration. By default, it is set to 10.


# Using numPartitions
df = spark.read \
  .format("jdbc") \
  .option("query", "select id,age from employee where gender='M'") \
  .option("numPartitions",5) \
  .option("fetchsize", 20) \
  .......
  .......
  .load()

7. Append Table

To append rows to the existing MySQL database table, use the spark.write.mode("append")


# Write to SQL Table
sampleDF.write \
  .format("jdbc") \
  .option("driver","com.mysql.cj.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/emp") \
  .option("dbtable", "employee") \
  .option("user", "root") \
  .option("password", "root") \
  .save()

8. PySpark Shell MySQL Connector

Sometimes you may be required to connect to MySQL from the PySpark shell interactive mode to test some queries, you can achieve this by providing MySQL connector library to spark-shell. once the shell started, you can run your queries.


bin/pyspark
      --master local[*] 
      --jars /path/to/mysql/connector/mysql-connector-java-8.0.13.jar

9. Spark Submit MySQL Connector

Similarly, you also need to add the MySQL connector jar to the –jar with spark-submit. If you have multiple jars refer to how to add multiple jars to spark-submit. With this command


bin/spark-submit 
      --master yarn 
      --jars /path/to/mysql/connector/mysql-connector-java-8.0.13.jar
      .........
      .........
      .........

9. Complete Example

Following is the complete example of reading and writing DataFrame to MySQL table.


# Imports
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder
           .appName('SparkByExamples.com')
           .config("spark.jars", "mysql-connector-java-8.0.13.jar")
           .getOrCreate()

# Create DataFrame 
columns = ["id", "name","age","gender"]
data = [(1, "James",30,"M"), (2, "Ann",40,"F"),
    (3, "Jeff",41,"M"),(4, "Jennifer",20,"F")]

sampleDF = spark.sparkContext.parallelize(data).toDF(columns)

# Write to MySQL Table
sampleDF.write \
  .format("jdbc") \
  .option("driver","com.mysql.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/database_name") \
  .option("dbtable", "employee") \
  .option("user", "root") \
  .option("password", "root") \
  .save()

# Read from MySQL Table
val df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/database_name") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

df.show()

Conclusion

In conclusion, PySpark provides robust capabilities for seamlessly interacting with MySQL databases, offering efficient mechanisms for both reading from and writing to MySQL tables. Through the JDBC connector, PySpark facilitates parallelized data retrieval, enabling scalable and high-performance data processing.

Similarly, PySpark’s DataFrame API simplifies the process of writing data back to MySQL tables, offering flexibility and ease of use. With the ability to leverage parallel processing and distributed computing, PySpark empowers data engineers and analysts to harness the full potential of MySQL data within Spark-powered analytics workflows, facilitating informed decision-making and unlocking insights from diverse datasets.

Related Articles

References

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

This Post Has One Comment

  1. Ajay Prajapati

    Hi Naveen, Its nice article . I have followed it to connect to pyspark and mysql. I have one suggestion if you could add in article , it will be good to follow. Could you please add the link as https://dev.mysql.com/downloads/connector/j/ and select Platform Independent to download connector jar package download. It will help everyone to find the jar file
    Hope this change will help every one to follow this article smoothly.

    Thank you so much for nice content on pyspark.
    Ajay

Comments are closed.