• Post author:
  • Post category:PySpark
  • Post last modified:March 27, 2024
  • Reading time:9 mins read
You are currently viewing Dynamic way of doing ETL through Pyspark

Instead of writing ETL for each table separately, you can have a technique of doing it dynamically by using the database (MySQL, PostgreSQL, SQL-Server) and Pyspark. Follow some steps to write code, for better understanding I am breaking it into steps.

Step 1

create two tables on database(I am using SQL-SERVER) having name of TEST_DWH :
table etl_metadata for keeping master data of ETL (source and destination information)


CREATE TABLE [dbo].[etl_metadata](
    [id] [int] IDENTITY(1,1) NOT NULL,
    [source_type] [varchar](max) NULL,
    [source_info] [text] NULL,
    [destination_db] [varchar](max) NULL,
    [destination_schema] [varchar](max) NULL,
    [destination_table] [varchar](max) NULL,
    [etl_type] [varchar](max) NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]

table etl_metadaata_schedule for having progress of daily ETL/Scehdule


CREATE TABLE [dbo].[etl_metadata_schedule](
    [id] [int] NULL,
    [source_type] [varchar](max) NULL,
    [source_info] [text] NULL,
    [destination_db] [varchar](max) NULL,
    [destination_schema] [varchar](max) NULL,
    [destination_table] [varchar](max) NULL,
    [etl_type] [varchar](max) NULL,
    [status] [varchar](max) NULL,
    [started_at] [datetime] NULL,
    [completed_at] [datetime] NULL,
    [schedule_date] [datetime] NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]

Step 2 : Write ETL in python using Pyspark

Initiating python script with some variable to store information of source and destination


"""
Created on Thu Mar 17 11:06:28 2022
@author: mustafa
"""
from pyspark.sql import SparkSession
import pyodbc
import pandas as pd
import findspark
findspark.init()
findspark.find()

spark = SparkSession \
    .builder \
    .appName("Python ETL script for TEST") \
    .master("local[*]")\
    .config("spark.driver.memory", '8g')\
    .config("spark.sql.ansi.enabled ",True)\
    .config("spark.jars", "C:\Drivers\sqljdbc42.jar") \
    .getOrCreate()

source_type = ''
source_info = ''
destination_db=''
destination_schema=''
destination_table = ''
etl_type = ''
query_string = '' 

Read Metadata from database, in my case I am using SQL Server


database = "TEST_DWH"
user = "user"
password  = "password"


query_string="SELECT a.*,CONCAT(ISNULL(b.status,'Pending'),b.status) status,null status_description ,null started_at,null completed_at FROM (SELECT *,getdate()  schedule_date FROM dbo.etl_metadata ) a LEFT JOIN [dbo].[etl_metadata_schedule] b ON a.id = b.id and  CAST(b.schedule_date AS date)= CAST(getdate() AS date) where ISNULL(b.status,'A') != 'completed'"

#Read ETL Meta Data
etl_meta_data_staging = spark.read\
    .format("jdbc") \
    .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
    .option("query", query_string) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()  

In case if you have a scenario to re run ETL with in a day than following code is useful, you may skip this chunk of code.


etl_meta_data_staging.filter("status == 'Pending'").show()

#THEN READ BASE META DATA AND  CREATE ONE ELSE DONT
etl_meta_data_staging.filter("status == 'Pending'").write \
        .format("jdbc") \
        .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
        .option("dbtable", "dbo.etl_metadata_schedule") \
        .option("user", user) \
        .option("password", password) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .mode("append")\
        .save() 

we have schedule metadata in our database and have to maintain its status (Pending, Completed or or Error) , Started at and Completed at.


#SQL SERVER CONNECTION TO MAINTAIN ERROR STATE-------------#
conn = pyodbc.connect("Driver={ODBC Driver 17 for SQL Server};"
                      "Server=localhost,1433;"
                      "Database="+database+";"
                      "UID="+user+";"
                      "PWD="+password+";")
cursor = conn.cursor()

Converting Schedule metadata into Pandas and Sorting it by IDs


df_etl_meta_data_staging  = etl_meta_data_staging.toPandas()
df_etl_meta_data_staging = df_etl_meta_data_staging.sort_values('id')

Now, Loop all data captured in Pandas by Reading CSV (source), and inserting all data in SQL-Server (Destination). All by Pyspark


for etl_id in df_etl_meta_data_staging['id']:
   status = 'In Progress'
   print("Starting for "+ str(etl_id))
   #---------------UPDATE In Progress Status---------------#
   cursor.\
       execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
               SET [status]=\'''' 
               +status+ "',[started_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
   conn.commit()

   # load meta data into variables
   source_type = df_etl_meta_data_staging['source_type'][df_etl_meta_data_staging['id']==etl_id].values[0]
   source_info = df_etl_meta_data_staging['source_info'][df_etl_meta_data_staging['id']==etl_id].values[0]
   destination_db = df_etl_meta_data_staging['destination_db'][df_etl_meta_data_staging['id']==etl_id].values[0]
   destination_schema = df_etl_meta_data_staging['destination_schema'][df_etl_meta_data_staging['id']==etl_id].values[0]
   destination_table = df_etl_meta_data_staging['destination_table'][df_etl_meta_data_staging['id']==etl_id].values[0]
   etl_type = df_etl_meta_data_staging['etl_type'][df_etl_meta_data_staging['id']==etl_id].values[0]

# initialize empty status for each run
status = ''

# Read  data from spurce try to read otherwise through exception
   try: 
        print("Reading via ", source_info)
        # Read CSV data 
        if source_type == 'CSV':
           jdbcDF = spark.read\
                       .format("csv") \
                       .option("header", "true") \
                       .option("quote", "\"") \
                       .option("escape", "\"") \
                       .load(source_info) 

           status= 'read_successful'
           jdbcDF.show()

        elif source_type == 'sqlserver':
            jdbcDF = spark.read\
                .format("jdbc") \
                .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
                .option("query", source_info) \
                .option("user", user) \
                .option("password", password) \
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                .load() 

        #Try to Write Extracted data relevant to destination table
        try:
          jdbcDF.write \
                  .format("jdbc") \
                  .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+destination_db+"};") \
                  .option("dbtable", destination_schema+"."+destination_table) \
                  .option("user", user) \
                  .option("password", password) \
                  .option("truncate", "true") \
                  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                  .mode("overwrite")\
                  .save()

          status = 'completed'
          print("Write Successful")
          #---------------UPDATE Success Status---------------#
          cursor.\
              execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
                      SET [status]=\'''' 
                      +status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
          conn.commit()
          #---------------UPDATE Success Status---------------#

        #except of Write Extracted data relevant to destination table
        #---------------UPDATE Success Status---------------#
        except Exception as e :
              print('some error in writing')
              status = 'error in writing to destination db, '+str(e)
              #---------------UPDATE Error Status---------------#
              cursor.\
                  execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
                          SET [status]=\'''' 
                          +status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
              conn.commit()
              #---------------UPDATE Error Status---------------#
    #except of Read module data
   except Exception as e :
         print("some error in reading from source")
         status = 'error reading source , '+str(e)
         print(status)
         #---------------UPDATE Error Status---------------#
         cursor.\
             execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
                     SET [status]=\'''' 
                     +status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
         conn.commit()
         #---------------UPDATE Error Status---------------#