Global configuration¶
Global configuration can be stored in config file or in each work-flow. Usually used for SMTP massaging, logging level and for folder locations
Global varaible will be used at dingDong init
from dingDONG import DingDONG
from dingDONG import Config
Config.SMTP_RECEIVERS = [<email1>, <email2>...] # SEND EMAIL: TO
Config.SMTP_SERVER = "<SMTP server>"
Config.SMTP_SERVER_USER = "email@address.com"
Config.SMTP_SERVER_PASS = "<email password>"
Config.SMTP_SENDER = "<email from>" # SEND EMAIL: FROM
PROJECT_FOLDER = <folder path>\JSON # main folder to store all JSON work-flow files
LOGS_FOLDER = <folder path>\LOGS # logs folder
SQL_FOLDER = <folder path>\SQL # SQL files to execute
FILES_NOT_INCLUDE = ['<jsonFile.json>', '<jsonFile1>'] # JSON files to ignore while using JSON folder
FILES_INCLUDE = ['<jsonFile5.json>','<jsonFile8>'] # Load only this JSON files
CONN_DICT = {
'dwh' : {"conn":"sql" , "url":<URL string>,"file":"sample.sql"},
'sap' : {"conn":"oracle", 'dsn':<dnn> , 'user':<user>,'pass':<pass>,'nls':<local nls language>},
'crm' : {"conn":"sql" , "url":<URL string>},
'file': {'delimiter':'~','header':True, 'folder':<folder path>,'replace':r'\"|\t'}
}
Ding-Dong (mapp-load)¶
Sample of extracting 3 CSV files into temporal SqlLite tables. Creating a query to store aggragated data into results table, and extracting all results into CSV file.
""" import modules -> logging used fr setting log level"""
import logging
from dingDong import DingDong
from dingDong import Config
""" set log level: logging.INFO, logging.DEBUG, logging.ERROR """
Config.LOGS_DEBUG = logging.DEBUG
""" Config all connection URl
Can be used by update Config.CONN_URL property or by send dictionary into connDict property at DingDong class init`
key : can be general connection name , or connection type (sql, oracle, file .. )
value:
String--> connection string URL (key will be used to defined connection type: sql, oracle, mySql....
Dictionary -->
'conn' -> connenction type. full type list can be found at dingDong.misc.enumsJson.eConn static class
'url' -> connection URL
"""
Config.CONN_URL = {
'sampleSql': {'conn': 'sql',"url": "<Sql server connection string>;UID=USER;PWD=PWD;"},
'file': "C:\\dingDong\\",
'sqlite': "C:\\dingDong\\sqlLiteDB.db"}
""" This is sample JSON configuration format for:
1. mapping and loading CSV file named DATAELEMENTDESCRIPTION into SQLLite table named dateElements_Desc
2. mapping and loading CSV file named DEMOGRAPHICS into SQLLite table named demographics
3. mapping and loading CSV file named MEASURESOFBIRTHANDDEATH into SQLLite table named birthDate
4. create a new query based on demographics and birthDate into new table named `Final`
5. Update sample field at `Final` table by using direct PL/SQL query
6. Extract Final table data into a CSV file
file default datatype can be found at dingDong.conn.baseBatch under DEFAULTS values (currently set to VARCHAR(200) for all relation Dbs
"""
nodesToLoad = [
{"source": ["file", "DATAELEMENTDESCRIPTION.csv"],
"target": ["sqlite", "dateElements_Desc"]},
{"source": ["file", "DEMOGRAPHICS.csv"],
"target": ["sqlite", "demographics"]},
{"source": ["file", "MEASURESOFBIRTHANDDEATH.csv"],
"target": ["sqlite", "birthDate"]},
{"query": ["sqlite", """ Select d.[State_FIPS_Code] AS A, d.[County_FIPS_Code] AS B, d.[County_FIPS_Code] AS G,d.[County_FIPS_Code], d.[CHSI_County_Name], d.[CHSI_State_Name],[Population_Size],[Total_Births],[Total_Deaths]
From demographics d INNER JOIN birthDate b ON d.[County_FIPS_Code] = b.[County_FIPS_Code] AND d.[State_FIPS_Code] = b.[State_FIPS_Code]"""],
"target": ["sqlite", "Final", 2]},
{"myexec": ["sqlite", "Update dateElements_Desc Set [Data_Type] = 'dingDong';"]},
{"source": ["sqlite", "Final"],
"target": ["file", "finall.csv"]}
]
"""
Init class DingDong"
dicObj -> loading node mapping dictionay (as the listed sample)
dirData-> will load all JSON configuration file located at this folder
includeFiles -> FILTER to load list of files in dirData folder
notIncldeFiles -> FILTER to remove list of files in dirData folder
connDixt -> update all connection url. same property as Config.CONN_URL
processes -> number of parrallel processing for loading data (DONG module)
"""
dd = DingDong(dicObj=nodesToLoad, filePath=None, dirData=None,
includeFiles=None,notIncludeFiles=None,connDict=None, processes=1)
dd.msg.addState("Start Ding")
""" Mapping files structure into a table structure
Target not exists -> create new target table based on source table definitions
Target exists -> if there is change, there are 3 option to update the target table structure
1. copy old data into the table with date prefix and create a new table with updated metadata (default, CODE:-1)
2. create new table schema, store old schema in a copied table with date prefix and merge data from the old structure into a new structure (CODE: 1, updated at target or merge key values)
3. no change can be made into this table. CODE number 2. can be added only to target or merge objects
"""
dd.ding()
""" Extracting and loading data from source to target or to merge
if STT node exists in JSON mapping -> will update fields accordingly
if the column node exists -> will map column types by column node definition
if mapping node exists-> will map source to target accordingly
more detild can be found at decumentation
"""
dd.msg.addState("Start Dong")
dd.dong()
dd.msg.end(msg="FINISHED",pr=True)
PLSql Executor¶
dingDong using execution methods to allow managing all business logic workflows the simple below using a private function to set query parameters. execution is done in parallel by define priorities. in our sample all priority number 1 will execute in parallel, same for priority 2 and so on. Each execution can reciave paramters as a dcitioanry. each step is moitored by the logging mechanism dd.msg.addState(“step desc”) is used for adding massages and dd.msg.sendSMTPmsg send an HTML massage using SMTP confguration.
# sample of private function to manage start date and end date parameters for SQL queries
# current sample - receive days and return startDate and endDate in %Y%m%d format
def setStartEndTime (e=1, s=400, f="%Y%m%d"):
dataRange, curDate = (e,s,f,) , datetime.datetime.today()
startDay = (curDate - datetime.timedelta(days=dataRange[1])).strftime(dataRange[2])
endDay = (curDate - datetime.timedelta(days=dataRange[0])).strftime(dataRange[2])
return startDay, endDay
# update SQL queries parameters
startDay, endDay = setStartEndTime (e=1, s=1000, f="%Y%m%d")
config.QUERY_PARAMS = {
"$start" : startDay,
"$end" : endDay
}
ddSQLExecution = [
(1, SQL_FOLDER+"\\updateDWH.sql", {}),
(2, "exec Procedure_1_SQL", {}),
(3, "exec Procedure_2_SQL", {}),
(3, "exec Procedure_3_SQL" , {}),
(4, "exec Procedure_4_SQL", {}),
(5, "exec Procedure_5_SQL @last_etl_date='$start'" ,{'$start':config.QUERY_PARAMS['$start']}),
(5, "exec Procedure_6_SQL", {})
]
dd = dingDong( dicObj=None, filePath=None, dirData=PROJECT_FOLDER,
includeFiles=FILES_INCLUDE, notIncludeFiles=FILES_NOT_INCLUDE,
dirLogs=LOGS_FOLDER, connDict=CONN_DICT, processes=4)
dd.setLoggingLevel(val=logging.DEBUG)
dd.execDbSql(queries=qs, connName='sql')
dd.msg.addState("FINISH ALL SQL QUERIES !")
dd.msg.sendSMTPmsg (msgName="FINISHED EXECUTING WORK-FLOW", onlyOnErr=False, withErr=True, )
Source to target mapping (STT)¶
#################################################
######### SAMPLE JSON FILE #########
#################################################
[
{
"target": ["sql", "STG_Services"],
"query": ["oracle", [
"SELECT COL1 as col1_Desc , COL2 as col2_Desc, COL3 as ValidEndDate, COL4 as ValidBgDate , COL5 as col5_Desc,",
"COL6 as col6_Desc, COL7 as col7_Desc, COL8 as col8_Desc, COL9 as col8_Desc ",
"FROM sar.services where COL7 ='B'"]
],
"exec":["sql", "update_Target_STG_Services.sql"],
"merge":["DWH_Services",["COL1","COL2"]],
"sttappend":{
"ValidEndDate":{"s":"COL3", "t":"smalldatetime", "f":"fDCast()"},
"ValidBgDate": {"s":"COL4", "t":"smalldatetime", "f":"fDCast()"},
"LongDesc" : {"t":"nvarchar(500)","e":"{COL6}{COL7}{COL8}"},
"ETL_Date": {"t":"smalldatetime","f":"fDCurr()"}
},
"index":[{"c":["COL1", "COL2"],"ic":true,"iu":False}]
}
]
#################################################
######### SAMPLE PYTHON FILE #########
#################################################
# Global configuration
from dingDong.config import config
from dingDong.bl.ddExecuter import dingDong
config.SMTP_RECEIVERS = [<email1>, <email2>...] # SEND EMAIL: TO
config.SMTP_SERVER = "<SMTP server>"
config.SMTP_SERVER_USER = "email@address.com"
config.SMTP_SERVER_PASS = "<email password>"
config.SMTP_SENDER = "<email from>" # SEND EMAIL: FROM
# Init folder paths
PROJECT_FOLDER = <folder path>\JSON # main folder to store all JSON work-flow files
LOGS_FOLDER = <folder path>\LOGS # logs folder
SQL_FOLDER = <folder path>\SQL # SQL files to execute
FILES_NOT_INCLUDE = [] # JSON files to ignore while using JSON folder
FILES_INCLUDE = [] # Load only this JSON files
# Init connection properties
CONN_DICT = {
'dwh' : {"conn":"sql" , "url":<URL string>,"file":"sample.sql"},
'sap' : {"conn":"oracle", 'dsn':<dnn> , 'user':<user>,'pass':<pass>,'nls':<local nls language>},
'crm' : {"conn":"sql" , "url":<URL string>},
'file': {'delimiter':'~','header':True, 'folder':<folder path>,'replace':r'\"|\t'}
}
# list for PL/SQL execution script
ddSQLExecution = [
(1, SQL_FOLDER+"\\updateDWH.sql", {}),
(2, "exec Procedure_1_SQL", {}),
(3, "exec Procedure_2_SQL", {}),
(3, "exec Procedure_3_SQL" , {}),
(4, "exec Procedure_4_SQL", {}),
(5, "exec Procedure_5_SQL @last_etl_date='$start'" ,{'$start':config.QUERY_PARAMS['$start']}),
(5, "exec Procedure_6_SQL", {})
]
# private function for managing paramteres
def _setStartEndTime (e=1, s=100, f="%Y%m%d"):
dataRange, curDate = (e,s,f,) , datetime.datetime.today()
startDay = (curDate - datetime.timedelta(days=dataRange[1])).strftime(dataRange[2])
endDay = (curDate - datetime.timedelta(days=dataRange[0])).strftime(dataRange[2])
return startDay, endDay
# Internal function in config file
startDay, endDay = _setStartEndTime (e=1, s=1000, f="%Y%m%d")
config.QUERY_PARAMS = {
"$start" : startDay,
"$end" : endDay
}
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Loading data from json files, cant get: source list files or destination list files or append mode () ')
dd = dingDong( dicObj=None, filePath=None, dirData=PROJECT_FOLDER,
includeFiles=FILES_INCLUDE, notIncludeFiles=FILES_NOT_INCLUDE,
dirLogs=LOGS_FOLDER, connDict=CONN_DICT, processes=4)
dd.setLoggingLevel(val=logging.DEBUG)
dd.ding()
dd.msg.addState("DING FINSHED")
dd.dong()
dd.msg.addState("DONG FINISHED")
dd.execDbSql(queries=ddSQLExecution, connName='sql')
dd.msg.addState("DONE SQL QUERIES")
dd.execMicrosoftOLAP(serverName=<SSAS server name>, dbName=<SSAS db name>, cubes=[], dims=[], fullProcess=True)
dd.msg.addState("DONOE MICROSOFT SSAS")
dd.msg.sendSMTPmsg (msgName="JOB SAMPLE LOADING FINSISHED", onlyOnErr=False, withErr=True, )
Ding Work-flow¶
EXTRACT: | Load from oracle query into sql server table STG_Services using truncate insert method |
---|---|
EXECUTE: | Executing SQL file named ** update_Target_STG_Services.sql ** |
EXTRACT: | Merge data from table ** STG_Services ** (target) to ** DWH_Services ** |
TRANFORM: | function fDCast(). Columns ValidEndDate,ValidBgDate convert string values to smalldatetime More on function can be found at Extract functions |
TRANSFORM: | execution function. Column LongDesc Concatinate 3 columns into long string: COL6+COL7+COL8 |
TRANSFORM: | function fDCurr(). Update Column ETL_Date with system datetime value. |
EXTRACT: | Merge data from STG_Services into DWH_Services
|
Dong Work-Flow¶
DATA-TYPES: | All oracle query columns COL1, COL2, … will be in STG_Services and DWH_Services using |
---|
SQL datatype align to oracle data-types :DATA-TYPES: ValidEndDate,ValidBgDate will have smalldatetime :DATA-TYPES: LongDesc will have nvarchar(500) :DATA-TYPES: ETL_Date will have smalldatetime :INDEX: Tables STG_Services and DWH_Services will have non unique (“iu”:false), clustered index (“ic”:true) on COL1 and COl2