Instead of writing ETL for each table separately, you can have a technique of doing it dynamically by using the database (MySQL, PostgreSQL, SQL-Server) and Pyspark. Follow some steps to write code, for better understanding I am breaking it into steps.
Step 1
create two tables on database(I am using SQL-SERVER) having name of TEST_DWH :
table etl_metadata for keeping master data of ETL (source and destination information)
CREATE TABLE [dbo].[etl_metadata](
[id] [int] IDENTITY(1,1) NOT NULL,
[source_type] [varchar](max) NULL,
[source_info] [text] NULL,
[destination_db] [varchar](max) NULL,
[destination_schema] [varchar](max) NULL,
[destination_table] [varchar](max) NULL,
[etl_type] [varchar](max) NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
table etl_metadaata_schedule for having progress of daily ETL/Scehdule
CREATE TABLE [dbo].[etl_metadata_schedule](
[id] [int] NULL,
[source_type] [varchar](max) NULL,
[source_info] [text] NULL,
[destination_db] [varchar](max) NULL,
[destination_schema] [varchar](max) NULL,
[destination_table] [varchar](max) NULL,
[etl_type] [varchar](max) NULL,
[status] [varchar](max) NULL,
[started_at] [datetime] NULL,
[completed_at] [datetime] NULL,
[schedule_date] [datetime] NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
Step 2 : Write ETL in python using Pyspark
Initiating python script with some variable to store information of source and destination
"""
Created on Thu Mar 17 11:06:28 2022
@author: mustafa
"""
from pyspark.sql import SparkSession
import pyodbc
import pandas as pd
import findspark
findspark.init()
findspark.find()
spark = SparkSession \
.builder \
.appName("Python ETL script for TEST") \
.master("local[*]")\
.config("spark.driver.memory", '8g')\
.config("spark.sql.ansi.enabled ",True)\
.config("spark.jars", "C:\Drivers\sqljdbc42.jar") \
.getOrCreate()
source_type = ''
source_info = ''
destination_db=''
destination_schema=''
destination_table = ''
etl_type = ''
query_string = ''
Read Metadata from database, in my case I am using SQL Server
database = "TEST_DWH"
user = "user"
password = "password"
query_string="SELECT a.*,CONCAT(ISNULL(b.status,'Pending'),b.status) status,null status_description ,null started_at,null completed_at FROM (SELECT *,getdate() schedule_date FROM dbo.etl_metadata ) a LEFT JOIN [dbo].[etl_metadata_schedule] b ON a.id = b.id and CAST(b.schedule_date AS date)= CAST(getdate() AS date) where ISNULL(b.status,'A') != 'completed'"
#Read ETL Meta Data
etl_meta_data_staging = spark.read\
.format("jdbc") \
.option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
.option("query", query_string) \
.option("user", user) \
.option("password", password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
In case if you have a scenario to re run ETL with in a day than following code is useful, you may skip this chunk of code.
etl_meta_data_staging.filter("status == 'Pending'").show()
#THEN READ BASE META DATA AND CREATE ONE ELSE DONT
etl_meta_data_staging.filter("status == 'Pending'").write \
.format("jdbc") \
.option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
.option("dbtable", "dbo.etl_metadata_schedule") \
.option("user", user) \
.option("password", password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.mode("append")\
.save()
we have schedule metadata in our database and have to maintain its status (Pending, Completed or or Error) , Started at and Completed at.
#SQL SERVER CONNECTION TO MAINTAIN ERROR STATE-------------#
conn = pyodbc.connect("Driver={ODBC Driver 17 for SQL Server};"
"Server=localhost,1433;"
"Database="+database+";"
"UID="+user+";"
"PWD="+password+";")
cursor = conn.cursor()
Converting Schedule metadata into Pandas and Sorting it by IDs
df_etl_meta_data_staging = etl_meta_data_staging.toPandas()
df_etl_meta_data_staging = df_etl_meta_data_staging.sort_values('id')
Now, Loop all data captured in Pandas by Reading CSV (source), and inserting all data in SQL-Server (Destination). All by Pyspark
for etl_id in df_etl_meta_data_staging['id']:
status = 'In Progress'
print("Starting for "+ str(etl_id))
#---------------UPDATE In Progress Status---------------#
cursor.\
execute('''UPDATE [TEST_DWH].[dbo].[etl_metadata_schedule]
SET [status]=\''''
+status+ "',[started_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
conn.commit()
# load meta data into variables
source_type = df_etl_meta_data_staging['source_type'][df_etl_meta_data_staging['id']==etl_id].values[0]
source_info = df_etl_meta_data_staging['source_info'][df_etl_meta_data_staging['id']==etl_id].values[0]
destination_db = df_etl_meta_data_staging['destination_db'][df_etl_meta_data_staging['id']==etl_id].values[0]
destination_schema = df_etl_meta_data_staging['destination_schema'][df_etl_meta_data_staging['id']==etl_id].values[0]
destination_table = df_etl_meta_data_staging['destination_table'][df_etl_meta_data_staging['id']==etl_id].values[0]
etl_type = df_etl_meta_data_staging['etl_type'][df_etl_meta_data_staging['id']==etl_id].values[0]
# initialize empty status for each run
status = ''
# Read data from spurce try to read otherwise through exception
try:
print("Reading via ", source_info)
# Read CSV data
if source_type == 'CSV':
jdbcDF = spark.read\
.format("csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load(source_info)
status= 'read_successful'
jdbcDF.show()
elif source_type == 'sqlserver':
jdbcDF = spark.read\
.format("jdbc") \
.option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
.option("query", source_info) \
.option("user", user) \
.option("password", password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
#Try to Write Extracted data relevant to destination table
try:
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+destination_db+"};") \
.option("dbtable", destination_schema+"."+destination_table) \
.option("user", user) \
.option("password", password) \
.option("truncate", "true") \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.mode("overwrite")\
.save()
status = 'completed'
print("Write Successful")
#---------------UPDATE Success Status---------------#
cursor.\
execute('''UPDATE [TEST_DWH].[dbo].[etl_metadata_schedule]
SET [status]=\''''
+status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
conn.commit()
#---------------UPDATE Success Status---------------#
#except of Write Extracted data relevant to destination table
#---------------UPDATE Success Status---------------#
except Exception as e :
print('some error in writing')
status = 'error in writing to destination db, '+str(e)
#---------------UPDATE Error Status---------------#
cursor.\
execute('''UPDATE [TEST_DWH].[dbo].[etl_metadata_schedule]
SET [status]=\''''
+status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
conn.commit()
#---------------UPDATE Error Status---------------#
#except of Read module data
except Exception as e :
print("some error in reading from source")
status = 'error reading source , '+str(e)
print(status)
#---------------UPDATE Error Status---------------#
cursor.\
execute('''UPDATE [TEST_DWH].[dbo].[etl_metadata_schedule]
SET [status]=\''''
+status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
conn.commit()
#---------------UPDATE Error Status---------------#
Related Articles
- Setup and run PySpark on Spyder IDE
- What is PySpark and who uses it?
- PySpark withColumnRenamed to Rename Column on DataFrame
- How to Install PySpark on Mac (in 2022)
- PySpark Add a New Column to DataFrame
- PySpark printSchema() Example
- Install PySpark in Jupyter on Mac using Homebrew
- PySpark “ImportError: No module named py4j.java_gateway” Error