Sometimes, we could come across a requirement to get data from a Cloud based API (Application Programming Interface; I know the abbreviation!) providing JSON data (JavaScript Object Notation .. sigh!!) to be analyzed directly in a visualization tool like Tableau.
You may or may not want to store the data in an intermediate data store (like Database or flat files) but would prefer to analyze this directly in the Tableau BI tool. Also, you may need the data to be extracted and refreshed in its entirety or incrementally from time to time and have it published to Tableau Server (if you have one!) so your users could make use of the same.
You would have to write some code for it though to get the data from Cloud API.
Sounds friendly enough? This is how we do it in a simple flow.
Okay this may look like a cheesy graphic, but does explains what we are doing here.
Moreover the aim here is to give you sample code and installation steps, so you could modify the same to connect to any Cloud API and get your required data in Tableau Hyper for your analysis.
You may or may not want to store the data in an intermediate data store (like Database or flat files) but would prefer to analyze this directly in the Tableau BI tool. Also, you may need the data to be extracted and refreshed in its entirety or incrementally from time to time and have it published to Tableau Server (if you have one!) so your users could make use of the same.
How or what to do for such a requirement?
Well if you are aware of programming languages like Python, then you are aware that this could be done easily.You would have to write some code for it though to get the data from Cloud API.
But where to store the data?
Tableau is friendly with and uses a proprietary Hyper File Format (.hyper files) which it could connect to directly to analyze the underlying data. This could also be published directly to Tableau Server. Hyper files are essentially mini data instances of PostgreSQL database which could hold data and also provide SQL features like Insert, Update, Delete, etc.Sounds friendly enough? This is how we do it in a simple flow.
Okay this may look like a cheesy graphic, but does explains what we are doing here.
Moreover the aim here is to give you sample code and installation steps, so you could modify the same to connect to any Cloud API and get your required data in Tableau Hyper for your analysis.
So how about doing this?
There are two main things which are involved here.- Setting up the Environment and Installing the code (use the sample code provided here)
- Understanding the code, so you could modify this to your requirements.
& here we begin...
-->For setting up the environment and installing the code, four main pieces are required.
- Install Python and dependencies like requests, faker (optional)
- Install tableauhyperapi dependency for Python link here along with Microsoft Visual C++ Runtime
- Install Visual Studio Code with Pylint extension
- Install Tableau Desktop
I am using Faker to generate random data. Its use is optional.
I have done my setup on Windows and followed standard installation steps for each of the components above. I believe my code could be easily ported to Linux environment too, but I haven't really tested there, so its your game there. Also, I prefer installation of tableauhyperapi dependency using .whl file for Python.
A typical installation steps are given in Readme document along with the attached source code.
A typical installation steps are given in Readme document along with the attached source code.
You are responsible for licenses of above products as it may be required.
--> Understanding the code, so you could modify this to your requirements is a major step. Let's try digging in the code by first running it and then understanding it file by file.
To run the code open a command prompt to the extracted code directory and issue a command in code_extracted_path directory like path_to_python_exe\python.exe Main.py -h. For e.g.
D:\P2H_v1>c:\Python\Python38\python.exe Main.py -h
You would get output like below
Let's try to understand this output, the arguments which could be passed to the P2H Python Script.
The minimum input parameters that are needed are --output and --sampleapikey for the script to run and process the data from Sample API used in the script. For e.g. use below to generate sampleapi.hyper file.
First things you see with running the sample code is obviously the log which is generated on-screen and the sampleapi.hyper file which is produced. Inside the sampleapi.hyper file you see two tables Users and Resource which hold the real data from the API. You also see a Extract_Log table which records statistics on various data pulls for each run for each table. This could prove useful in analyzing what happened when the things go south, sigh!
--> Let's look at Main.py and try to understand what's happening here
Warning! Long code ahead!!
Now lot of things are happening in this file. The best way to analyze is by looking at the __main__ routine in the very end of the script (line 652)
What we are doing here is
Now in here you are defining the base data structure for an object in the form of a Python Dictionary which would become a table in Tableau, defining a Decoder function which would be entry point for calling actual parsing functions from Main.py and then defining the actual parsing functions.
What should be really concerning here is:
For easy running of the Main.py script and to have the .hyper or .tdsx file uploaded to Tableau server, you could use the below Windows Batch Script. A similar thing could be achieved in UNIX as well using bash.
--> P2H_Extract_and_Publish.bat
This code should be self-explanatory and it uses tabcmd for doing the uploading to Tableau Server.
This script is also available inside the attached Sample Code.
So what do you say folks? Would this be of some help, I would be happy to learn if you were able to use this somewhere in your projects and I saved some of your efforts while doing your requirement.
As always, happy coding and have a great time getting your data from Cloud APIs into Tableau and analyzing it!
D:\P2H_v1>c:\Python\Python38\python.exe Main.py -h
You would get output like below
Let's try to understand this output, the arguments which could be passed to the P2H Python Script.
- help: -h or --help to see this help message
- output file: --output to specify the output file .hyper or .tdsx. You know about .hyper already, .tdsx is one step above .hyper file as it could contain Data Model as well joining different tables inside the hyper file. Since, we could not generate Data Model using this Python Script, the .tdsx file should be pre-existing so that the data inside it could be updated and existing data model could be used.
- API URL: --sampleapiurl is the URL for the API endpoint, could be specified here or a defaul could be set inside the Main.py script.
- API key: --sampleapikey is the authentication key which would be base64 encoded and sent to API in the request header for Basic Authorization.
- Load Type: --load is used for specifying type of load FULL or INC
- Incremental Date: --incdate is used for specifying the date from which data would be pulled from API. This needs configuration inside the script and API should support such method.
- Incremental Period: --incperiod is used if you want to specify period like Last Week, Last Month, Last Year, etc. instead of incdate for pulling the data. This helps in automating batch pulls.
- Email To: --emailto specifies the email address of the recipients who would receive email after the script has completed processing. This requires mail server configuration inside the script.
- Package XML: --packagexml option is different. Remember we could have .tdsx file as output? So if you have the data model xml .tds file from inside .tdsx file from somewhere, you could specify path to that xml file with this option and the script would package it along with .hyper file to create .tdsx for you.
- Backup: --backup, yes or no if you want to backup existing output file before doing the updates.
- Backup Path: --backuppath specifies the path for backup file, to be used with --backup option.
- Log: --log is used to specify the Python script log file location. If nothing is specified then log output is generated on screen.
The minimum input parameters that are needed are --output and --sampleapikey for the script to run and process the data from Sample API used in the script. For e.g. use below to generate sampleapi.hyper file.
D:\P2H_v1>c:\Python\Python38\python.exe Main.py --output sampleapi.hyper --sampleapikey any_text_for_this
So in the code_extracted_path directory you would now see the sampleapi.hyper file which you could open with Tableau Desktop to analyze the data inside.
Now the code has run and there are a lot more things to observe, let's dig further.
Dig into the code, yummm me likey!
So what are the components making this data pull possible?
Obviously these are the two Python scripts Main.py and SampleDataSchema.py working with tableauhyperapi which we installed before and a Sample API (I used https://reqres.in/api) from where we are getting the data.--> Let's look at Main.py and try to understand what's happening here
Warning! Long code ahead!!
| #Basic Imports import sys import os import argparse import datetime import time import shutil import zipfile #For Email import smtplib import email #For working with REST APIs import requests import base64 #For writing Tableau Hyper File import tableauhyperapi #Custome import for Sample API Data Schema import SampleDataSchema #Global Variables ##Below variables are Input as Arugment from Command Line. Values defined here would be overridden by Command Line _gv_OutputHyperFile = None _gv_OutputPackageFile = None _gv_PackageXMLFile = None _gv_SampleAPIURL = None _gv_SampleAPIKey = None _gv_SampleAPIIncDate = None _gv_LogFile = None _gv_Backup = None _gv_BackupFilePath = None _gv_EmailMessageTo = None ##Modify below variables to change the script behaviour or output _gv_ApplicationBanner = "Python to Tableau Hyper (P2H)" #Application Banner (appears in Log and Email) _gv_SampleAPIContentType = "application/json" #Content Type passed in header for Request to REST API _gv_SampleAPITimeOut = None #Time out in seconds for REST API _gv_SampleAPIMaxRequestsPerSec = 10 #Maximum no. of requests made per second made to API _gv_TabHyperLoadSchema = "Extract" #Name of main output schema in Hyper File _gv_TabHyperStageTablesSuffix = "_Stage" #Name of Stage tables in Hyper Files. _gv_TabHyperStageUsePersistentTables = False #Should Stage Table should be dropped after each load _gv_TabHyperExtractLogTable = "Extract_Log" #Name of Extract Log Table inside Hyper File _gv_EmailServer = None #Email Server full name or ip address _gv_EmailMessageFrom = None #Email Address From _gv_ArchiveUnarchiveTempPath = "_temp" #Temporary Extract path for .tdsx package file _gv_TabHyperSendUsageStats = tableauhyperapi.Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU #Or tableauhyperapi.Telemetry.SEND_USAGE_DATA_TO_TABLEAU for sending Usage Statistics to Tableau #Variables used in calculations, do not modify _gv_SampleAPIAuthorization = None _gv_SampleAPIHeader = None _gv_SampleAPIBaseURIS = None _gv_SampleAPIResponse = None _gv_SampleAPICookies = None _gv_SampleAPIReqCounter = {"LastAccessed":"", "RequestCount":0} _gv_LogFilePath = None _gv_EmailMessage = None _gv_TabHyperProcess = None _gv_TabHyperCon = None #Function to Parse Input Arguments def ParseArguments(): global _gv_SampleAPIURL, _gv_SampleAPIKey, _gv_LogFilePath, _gv_SampleAPIIncDate, _gv_OutputHyperFile, _gv_OutputPackageFile, _gv_PackageXMLFile, _gv_Backup, _gv_BackupFilePath, _gv_EmailMessage, _gv_EmailMessageTo, _gv_ArchiveUnarchiveTempPath #Start of Email Message writing _gv_EmailMessage = "----------------------------------------------------\n" _gv_EmailMessage += _gv_ApplicationBanner + "\n" _gv_EmailMessage += "----------------------------------------------------\n\n" _gv_EmailMessage = "Arguments passed: \n" ##Parse Arguments - Modify here Input Arguments parser = argparse.ArgumentParser() parser.add_argument("--output", required=True, help="Full Path to Output File .hyper or .tdsx (.tdsx file should exist); e.g. c:\\new_folder\\SampleAPI_Output_file.hyper", type=str) parser.add_argument("--sampleapiurl", required=False, default="https://reqres.in/api", help="SampleAPI API URL, optional otherwise uses Default", type=str) parser.add_argument("--sampleapikey", required=True, help="Sample API Key", type=str) parser.add_argument("--load", required=False, default="Full", help="Load Type Full or Inc (Incremental); Default: Full", type=str) parser.add_argument("--incdate", required=False, help="[Required with load = Inc] Date (YYYY-MM-DD) to extract data from", type=str) parser.add_argument("--incperiod", required=False, help="[Required with load = Inc] Period to extract data from e.g. WTD, MTD, YTD, LastWeek, LastMonth, LastYear", type=str) parser.add_argument("--emailto", required=False, help="[Optional] (Requires E-Mail Config inside script) Comma separated email addresses", type=str) parser.add_argument("--packagexml", required=False, help="[Optional] Full path to .tds XML file to package with .hyper output file; e.g. c:\\new_folder\\SampleAPI_package_XML_file.tds", type=str) parser.add_argument("--backup", required=False, help="[Optional] Backup Pre-Existing Output File - Yes or No", type=str) parser.add_argument("--backuppath", required=False, help="[Optional] Full Directory Path for Backup, if not provided then Output directory is used", type=str) parser.add_argument("--log", required=False, help="[Optional] Full Path to Log File", type=str) args = parser.parse_args() #Check for Email if args.emailto != None: if _gv_EmailServer == None or _gv_EmailMessageFrom == None: parser.error("The --emailto parameter requires E-mail config (Server, From) to be done inside the script.") else: _gv_EmailMessageTo = args.emailto _gv_SampleAPIURL = args.sampleapiurl _gv_EmailMessage += "\n Sample API URL: " + str(_gv_SampleAPIURL) _gv_SampleAPIKey = args.sampleapikey _gv_EmailMessage += "\n Sample API Access Key: " + str(_gv_SampleAPIKey) _gv_LogFilePath = args.log _gv_EmailMessage += "\n Log File Path: " + str(_gv_LogFilePath) #Check for Incremental Date input parameter if str(args.load).lower() == "inc": if args.incdate == None and args.incperiod == None: parser.error("The --load=Inc requires --incdate or --incperiod parameter to have a value passed") else: if args.incdate != None: try: _gv_SampleAPIIncDate = datetime.datetime.strptime(args.incdate, "%Y-%m-%d") _gv_EmailMessage += "\n Load Type: INC" _gv_EmailMessage += "\n Inc Date: " + str(_gv_SampleAPIIncDate) except Exception as ex: parser.error("Could not parse the date in --incdate parameter. It should be in YYYY-MM-DD format. Please check and try again. \nDate Passed: " + args.incdate + "\n" + str(ex)) #if incperiod (WTD, MTD, YTD, LASTWEEK, LASTMONTH, LASTYEAR) is used instead of incdate elif args.incperiod != None: if str(args.incperiod).upper() == "WTD": _gv_SampleAPIIncDate = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") while _gv_SampleAPIIncDate.strftime("%a") != "Sat": _gv_SampleAPIIncDate -= datetime.timedelta(days = 1) elif str(args.incperiod).upper() == "MTD": _gv_SampleAPIIncDate = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y-%m") + "-01", "%Y-%m-%d") elif str(args.incperiod).upper() == "YTD": _gv_SampleAPIIncDate = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y") + "-01-01", "%Y-%m-%d") elif str(args.incperiod).upper() == "LASTWEEK": _gv_SampleAPIIncDate = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") _gv_SampleAPIIncDate -= datetime.timedelta(days = 7) while _gv_SampleAPIIncDate.strftime("%a") != "Sat": _gv_SampleAPIIncDate -= datetime.timedelta(days = 1) elif str(args.incperiod).upper() == "LASTMONTH": _gv_SampleAPIIncDate = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") _gv_SampleAPIIncDate -= datetime.timedelta(days = 31) while int(_gv_SampleAPIIncDate.strftime("%d")) != 1: _gv_SampleAPIIncDate -= datetime.timedelta(days = 1) elif str(args.incperiod).upper() == "LASTYEAR": _gv_SampleAPIIncDate = datetime.datetime.strptime(str(int(datetime.datetime.now().strftime("%Y"))-1) + "-01-01", "%Y-%m-%d") else: parser.error("Could not parse the --incperiod parameter. It should be one of WTD, MTD, YTD, LastWeek, LastMonth, LastYear values. \nPlease check and try again. \nPeriod Passed: " + args.incperiod) _gv_EmailMessage += "\n Load Type: INC" _gv_EmailMessage += "\n Inc Period: " + args.incperiod _gv_EmailMessage += "\n Inc Date: " + str(_gv_SampleAPIIncDate) else: parser.error("Could not parse the --incdate / --incperiod parameter. Please check and try again. \nDate Passed: " + args.incdate + " \nPeriod Passed: " + args.incperiod) if not(os.path.isfile(args.output)): parser.error("Output File does not exists and --load=Inc is specified") #if load type is FULL elif str(args.load).lower() == "full": _gv_SampleAPIIncDate = None _gv_EmailMessage += "\n Load Type: FULL" else: parser.error("Invalid --load parameter passed. Please check and try again. \n--load: " + args.load) #Check for Output File if not (str(os.path.splitext(args.output)[-1]).lower() == ".hyper" or str(os.path.splitext(args.output)[-1]).lower() == ".tdsx"): parser.error("Invalid file specified in --output parameter. File must end in .hyper or .tdsx (existing package file) extension. Please check and try again. \n--output: " + args.output) #When output file specified is .tdsx elif str(os.path.splitext(args.output)[-1]).lower() == ".tdsx": if os.path.isfile(args.output) != True: parser.error("Invalid .tdsx Output File specified. Output File does not exists, cannot proceed. --output with .tdsx file requires file to be existing. Please check and try again. \n--output: " + args.output) else: _gv_OutputPackageFile = args.output _gv_ArchiveUnarchiveTempPath += datetime.datetime.now().strftime("_%Y%m%d_%H%M%S") _gv_EmailMessage += "\n Output Package File: " + str(_gv_OutputPackageFile) #When output file specified is .hyper else: _gv_OutputHyperFile = args.output _gv_EmailMessage += "\n Output Hyper File: " + str(_gv_OutputHyperFile) #Check for Package XML file if not(args.packagexml == None or (os.path.isfile(args.packagexml) == True and str(os.path.splitext(args.packagexml)[-1]).lower() == ".tds")): parser.error("Invalid --packagexml file specified. File is not .tds or does not exists. \n--packagexml: " + args.packagexml) else: _gv_PackageXMLFile = args.packagexml _gv_EmailMessage += "\n Pacakage XML File: " + str(_gv_PackageXMLFile) #Check for Backup if str(args.backup).lower() == "yes": _gv_Backup = True if os.path.isfile(args.output) != True: parser.error("Invalid Output File specified with --backup = Yes. Output File does not exists, cannot backup. Please check and try again. \n--output: " + args.output) if not (os.path.isdir(args.backuppath if args.backuppath != None else ".\\") == True): parser.error("Invalid Backup Path specified. Please check and try again. \n--backuppath: " + args.backuppath) else: _gv_BackupFilePath = args.backuppath _gv_EmailMessage += "\n Backup: " + str(_gv_Backup) _gv_EmailMessage += "\n Backup File Path: " + str(_gv_BackupFilePath) #Finished parsing of Arguments _gv_EmailMessage += "\n\nFinished Parsing Arguments\n\n" #Function for writing Log (onscreen or to a file) def LogFileWriter(LogString): global _gv_LogFile, _gv_EmailMessage if _gv_LogFile == None: if _gv_LogFilePath == None: _gv_LogFile = sys.stdout else: try: _gv_LogFile = open(_gv_LogFilePath, "a") except Exception as ex: print("Error opening Log File. Exiting.\nLog File: " + _gv_LogFile) print(str(ex)) exit(1) _gv_LogFile.write("\n\n") _gv_LogFile.write("----------------------------------------------------\n") _gv_LogFile.write(_gv_ApplicationBanner + "\n") _gv_LogFile.write("----------------------------------------------------\n") _gv_LogFile.write(datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S") + ": " + "Beginning Script\n") _gv_EmailMessage += str(datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S") + ": " + "Beginning Script\n") _gv_LogFile.write(datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S") + ": " + LogString + "\n") _gv_EmailMessage += str(datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S") + ": " + LogString + "\n") _gv_LogFile.flush() #Function for Sending Email Message def SendEmailMessage(): if _gv_EmailMessage != None and _gv_EmailMessageFrom != None and _gv_EmailMessageTo != None and _gv_EmailServer != None: Message = email.message.EmailMessage() Message["Subject"] = _gv_ApplicationBanner + " - run of " + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) Message["From"] = _gv_EmailMessageFrom Message["To"] = _gv_EmailMessageTo Message.set_content(str(_gv_EmailMessage)) try: with smtplib.SMTP(_gv_EmailServer) as SMTPServer: SMTPServer.send_message(Message) except Exception as ex: LogFileWriter(str(ex)) #Function for Backing up Output File before writing def FileBackup(OrigPath, NewPath = None): if NewPath == None: shutil.copyfile(OrigPath, OrigPath + "_" + datetime.datetime.now().strftime("%Y%m%d_%H%M%S")) LogFileWriter("File Backed up successfully at location " + OrigPath + "_" + datetime.datetime.now().strftime("%Y%m%d_%H%M%S")) else: shutil.copy(OrigPath, os.path.join(NewPath, os.path.basename(OrigPath) + "_" + datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))) LogFileWriter("File Backed up successfully at location " + os.path.join(NewPath, os.path.basename(OrigPath) + "_" + datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))) #Function for Archiving / Unarchiving .tdsx file def FileArchiveUnarchive(OrigPath, Option): global _gv_OutputHyperFile #Archive File Operation if Option == "archive": try: if str(os.path.splitext(OrigPath)[-1]).lower() == ".tdsx": if os.path.isdir(os.path.join(os.path.dirname(OrigPath), _gv_ArchiveUnarchiveTempPath)) == True: os.remove(OrigPath) else: raise Exception("Error in archiving .tdsx file. Could not find unarchived extract to re-archive.") ZipFile = zipfile.ZipFile(OrigPath, "w", zipfile.ZIP_DEFLATED) for DirPath, SubDir, Files in os.walk(os.path.join(os.path.dirname(OrigPath), _gv_ArchiveUnarchiveTempPath)): for FileName in Files: FilePath = os.path.join(DirPath, FileName) if str(os.path.splitext(FileName)[-1]).lower() == ".tds": if _gv_PackageXMLFile == None: ZipFile.write(FilePath, FilePath.replace(os.path.dirname(OrigPath) + "\\" + _gv_ArchiveUnarchiveTempPath + "\\", "")) else: ZipFile.write(_gv_PackageXMLFile, FilePath.replace(os.path.dirname(OrigPath) + "\\" + _gv_ArchiveUnarchiveTempPath + "\\", "")) LogFileWriter("Replaced .tds file: " + FilePath.replace(os.path.dirname(OrigPath) + "\\" + _gv_ArchiveUnarchiveTempPath + "\\", "") + " in the Output File with file --packagexml: " + _gv_PackageXMLFile) else: ZipFile.write(FilePath, FilePath.replace(os.path.dirname(OrigPath) + "\\" + _gv_ArchiveUnarchiveTempPath + "\\", "")) ZipFile.close() shutil.rmtree(os.path.join(os.path.dirname(OrigPath) , _gv_ArchiveUnarchiveTempPath)) elif str(os.path.splitext(OrigPath)[-1]).lower() == ".hyper": if _gv_PackageXMLFile == None: raise Exception("Error in archiving .tdsx file. Could not find Package XML file to create the archive.") ZipFile = zipfile.ZipFile(os.path.join(os.path.dirname(OrigPath), os.path.basename(_gv_PackageXMLFile) + "x"), "w", zipfile.ZIP_DEFLATED) ZipFile.write(_gv_OutputHyperFile, _gv_OutputHyperFile.replace(os.path.dirname(_gv_OutputHyperFile) + "\\", "")) ZipFile.write(_gv_PackageXMLFile, _gv_PackageXMLFile.replace(os.path.dirname(_gv_PackageXMLFile) + "\\", "")) ZipFile.close() LogFileWriter("Created new .tdsx Output File " + os.path.join(os.path.dirname(OrigPath), os.path.basename(_gv_PackageXMLFile) + "x") + " based on file --packagexml: " + _gv_PackageXMLFile) elif os.path.isdir(OrigPath) == True: FilePaths = [] for DirPath, SubDir, Files in os.walk(OrigPath): for FileName in Files: FilePath = os.path.join(DirPath, FileName) FilePaths.append(FilePath) ZipFile = zipfile.ZipFile(os.path.join(OrigPath, os.path.basename(OrigPath) + ".zip"), "w", zipfile.ZIP_DEFLATED) for FilePath in FilePaths: ZipFile.write(FilePath, FilePath.replace(os.path.dirname(FilePath) + "\\", "")) ZipFile.close() except Exception as ex: LogFileWriter("Error in archiving file " + OrigPath + ". Please check below error message and try again.") LogFileWriter(str(ex)) #Unarchive File Operation elif Option == "unarchive": try: ZipFile = zipfile.ZipFile(OrigPath, "r") if str(os.path.splitext(OrigPath)[-1]).lower() == ".tdsx": for File_Name in ZipFile.namelist(): if str(os.path.splitext(File_Name)[-1]).lower() == ".hyper": _gv_OutputHyperFile = os.path.join(os.path.dirname(OrigPath), _gv_ArchiveUnarchiveTempPath, File_Name) LogFileWriter("Found the Hyper File in the Output Package. Hyper File Name: " + os.path.basename(_gv_OutputHyperFile)) if _gv_OutputHyperFile == None: raise Exception("Could not find .hyper file in Package File: " + OrigPath) ZipFile.extractall(os.path.join(os.path.dirname(OrigPath), _gv_ArchiveUnarchiveTempPath + "\\")) ZipFile.close() except Exception as ex: LogFileWriter("Error in unarchiving file " + OrigPath + ". Please check below error message and try again.") LogFileWriter(str(ex)) #Function for controlling max no. of requests per second to API def SampleAPIConnectionCounter(): global _gv_SampleAPIReqCounter if _gv_SampleAPIMaxRequestsPerSec != 0 and _gv_SampleAPIMaxRequestsPerSec != None: if _gv_SampleAPIReqCounter["LastAccessed"] == datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S"): if _gv_SampleAPIReqCounter["RequestCount"] >= _gv_SampleAPIMaxRequestsPerSec - 1: time.sleep(1) SampleAPIConnectionCounter() else: _gv_SampleAPIReqCounter["RequestCount"] += 1 else: _gv_SampleAPIReqCounter["LastAccessed"] = datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S") _gv_SampleAPIReqCounter["RequestCount"] = 1 ##Function for Initial Connection to Sample API def SampleAPIInitialConnection (SampleAPIURL, SampleAPIKey): global _gv_SampleAPIURL, _gv_SampleAPIAuthorization, _gv_SampleAPIHeader, _gv_SampleAPIBaseURIS, _gv_SampleAPIResponse _gv_SampleAPIURL = SampleAPIURL _gv_SampleAPIAuthorization = "Basic " + str(base64.b64encode(SampleAPIKey.encode("utf-8")), "utf-8") _gv_SampleAPIHeader = {"Authorization":_gv_SampleAPIAuthorization, "Content-Type":_gv_SampleAPIContentType} _gv_SampleAPIBaseURIS = {"users_base_url":_gv_SampleAPIURL + "/users", "resource_base_url":_gv_SampleAPIURL + "/unknown"} ##Initiating Connection LogFileWriter("Accessing Base Users API") SampleAPIConnectionCounter() _gv_SampleAPIResponse = requests.request("GET", _gv_SampleAPIBaseURIS["users_base_url"], headers = _gv_SampleAPIHeader, timeout = _gv_SampleAPITimeOut, allow_redirects = False) if _gv_SampleAPIResponse.status_code == 200: LogFileWriter("Base Users API Retrieved") _gv_SampleAPICookies = _gv_SampleAPIResponse.cookies SampleAPIJSONS = [] SampleAPIJSON = _gv_SampleAPIResponse.json() SampleAPIJSONS.append(SampleAPIJSON) ##Retreive all pages if API uses pagination while SampleAPIJSON.get("page") != SampleAPIJSON.get("total_pages"): _gv_SampleAPIResponse = requests.request("GET", _gv_SampleAPIBaseURIS["users_base_url"] + "?page=" + str(int(SampleAPIJSON.get("page")) + 1), headers = _gv_SampleAPIHeader, timeout = _gv_SampleAPITimeOut, allow_redirects = False, cookies=_gv_SampleAPICookies) SampleAPIJSON = _gv_SampleAPIResponse.json() SampleAPIJSONS.append(SampleAPIJSON) for SampleAPIJSON in SampleAPIJSONS: if len(SampleAPIJSON["data"]) <= 0: LogFileWriter("No Users found in Sample API for data retreival") LogFileWriter("Exiting Script") raise Exception("No Users found in Sample API for data retreival") else: LogFileWriter("Error in receiving Base Users API") LogFileWriter("Please check connection parameters again.") LogFileWriter("Exiting Script") raise Exception("Error Establishing Connection, Unable to get Base Users API \nSample API URL: " + SampleAPIURL + " \nSample API Key: " + SampleAPIKey + " \nSample API Authorization: " + _gv_SampleAPIAuthorization + " \nSample API Base Users URL: " + _gv_SampleAPIBaseURIS["users_base_url"]) ##Function for iterating through next pages of API if API uses pagination def SampleAPIGetDataNextPage(SampleAPIURL, CurrentPageNumber = None): global _gv_SampleAPIResponse SampleAPIConnectionCounter() _gv_SampleAPIResponse = requests.request("GET", SampleAPIURL + ("&" if str(SampleAPIURL).find("?") > 0 else "?") + "page=" + str(int(CurrentPageNumber if CurrentPageNumber != None else 0) + 1), headers = _gv_SampleAPIHeader, timeout = _gv_SampleAPITimeOut, allow_redirects = False, cookies=_gv_SampleAPICookies) if _gv_SampleAPIResponse.status_code == 200: SampleAPIJSON = _gv_SampleAPIResponse.json() return SampleAPIJSON else: LogFileWriter("Error in receiving data from Sample API") LogFileWriter("Please check log and try again.") LogFileWriter("Exiting Script") raise Exception("Error in receiving data from Sample API, Unable to get data from URL \nSample API URL: " + SampleAPIURL + ("&" if str(SampleAPIURL).find("?") > 0 else "?") + "page=" + (int(CurrentPageNumber if CurrentPageNumber != None else 0) + 1)) ##Function for getting data from the API def SampleAPIGetData(SampleAPIURL, SampleAPIURLPARAMS = None): global _gv_SampleAPIResponse SampleAPIConnectionCounter() _gv_SampleAPIResponse = requests.request("GET", SampleAPIURL + (SampleAPIURLPARAMS if SampleAPIURLPARAMS != None else ""), headers = _gv_SampleAPIHeader, timeout = _gv_SampleAPITimeOut, allow_redirects = False, cookies=_gv_SampleAPICookies) if _gv_SampleAPIResponse.status_code == 200: SampleAPIJSON = _gv_SampleAPIResponse.json() SampleAPIJSONS = [] SampleAPIJSONS.append(SampleAPIJSON) ExceptionCount = 0 while SampleAPIJSON.get("page") != SampleAPIJSON.get("total_pages"): try: SampleAPIJSON = SampleAPIGetDataNextPage(SampleAPIURL + (SampleAPIURLPARAMS if SampleAPIURLPARAMS != None else ""), SampleAPIJSON.get("page")) SampleAPIJSONS.append(SampleAPIJSON) ExceptionCount = 0 except Exception as ex: ExceptionCount += 1 if ExceptionCount <=5: LogFileWriter("Warning: " + str(ex)) else: LogFileWriter("Too Many Exceptions: " + str(ex)) raise Exception("Too Many Exceptions: " + str(ex)) return SampleAPIJSONS else: LogFileWriter("Error in receiving data from Sample API") LogFileWriter("Please check log and try again.") LogFileWriter("Exiting Script") raise Exception("Error in receiving data from Sample API, Unable to get data from URL \nSample API URL: " + SampleAPIURL) #Function for controlling Tableau Hyper Process def TabHyperProcessControl(ControlOption, HyperFilePath = None): global _gv_TabHyperProcess, _gv_TabHyperCon #Tableau Hyper Process start operation if ControlOption == "start": try: try: _gv_TabHyperProcess = tableauhyperapi.HyperProcess(_gv_TabHyperSendUsageStats) except tableauhyperapi.UnclosedObjectWarning as ex: pass if _gv_TabHyperProcess.is_open == True: LogFileWriter("Started Tableau Hyper Process") else: LogFileWriter("Tableau Hyper Process not started") raise Exception("Tableau Hyper Process not started") try: _gv_TabHyperCon = tableauhyperapi.Connection(_gv_TabHyperProcess.endpoint, HyperFilePath, tableauhyperapi.CreateMode.CREATE_IF_NOT_EXISTS) except tableauhyperapi.UnclosedObjectWarning as ex: pass if _gv_TabHyperCon.is_open == True: LogFileWriter("Opened Connection to Tableau Hyper File: \n" + HyperFilePath) else: LogFileWriter("Could not open Connection to Tableau Hyper File: " + HyperFilePath + " . Please check path and try again.") raise Exception("Could not open Connection to Tableau Hyper File: " + HyperFilePath + " . Please check path and try again.") except tableauhyperapi.HyperException as ex: LogFileWriter("There is exception in starting Tableau Hyper Process. Exiting...") LogFileWriter(str(ex)) raise Exception("There is exception in starting Tableau Hyper Process.") #Tableau Hyper Process shutdown operation elif ControlOption == "shutdown": try: _gv_TabHyperCon.close() _gv_TabHyperProcess.shutdown() if _gv_TabHyperProcess.is_open == False: LogFileWriter("Stopped Tableau Hyper Process") else: _gv_TabHyperProcess.close() LogFileWriter("Forcefully stopped Tableau Hyper Process") _gv_TabHyperProcess = None except tableauhyperapi.HyperException as ex: LogFileWriter("There is exception in stopping Tableau Hyper Process. Forcefully Stopping it...") LogFileWriter(str(ex)) _gv_TabHyperProcess.close() LogFileWriter("Forcefully stopped Tableau Hyper Process") #Invalid option passed to the Function else: LogFileWriter("Invalid Control Option Passed to Hyper Process Control: " + ControlOption) raise Exception("Invalid Control Option Passed to Hyper Process Control: " + ControlOption) #Function for wrting Extract Log Table inside Tableau Hyper File def TabHyperExtractLogWriter(Table_Affected, Action, Rows_Affected, Comment = None): global _gv_TabHyperExtractLogTable if _gv_TabHyperExtractLogTable != None: Table_Name_Log = tableauhyperapi.TableName(_gv_TabHyperExtractLogTable) Column_Definition_Log = [ tableauhyperapi.TableDefinition.Column("insert_timestamp", tableauhyperapi.SqlType.timestamp()), tableauhyperapi.TableDefinition.Column("table_name", tableauhyperapi.SqlType.varchar(100)), tableauhyperapi.TableDefinition.Column("action_type", tableauhyperapi.SqlType.varchar(100)), tableauhyperapi.TableDefinition.Column("rows_affected", tableauhyperapi.SqlType.int()), tableauhyperapi.TableDefinition.Column("comments", tableauhyperapi.SqlType.text()) ] Table_Definition_Log = tableauhyperapi.TableDefinition(Table_Name_Log, Column_Definition_Log, tableauhyperapi.Persistence.PERMANENT) _gv_TabHyperCon.catalog.create_table_if_not_exists(Table_Definition_Log) with tableauhyperapi.Inserter(_gv_TabHyperCon, Table_Definition_Log) as Log_Inserter: try: Log_Inserter.add_row([datetime.datetime.now(), Table_Affected, Action, Rows_Affected, Comment]) Log_Inserter.execute() except tableauhyperapi.HyperException as ex: LogFileWriter("Error writing Hyper Extract Log Table. Would not try to write again. \n Extract Log Table: " + _gv_TabHyperExtractLogTable) LogFileWriter(str(ex)) _gv_TabHyperExtractLogTable = None #Function for re-creating Output Hyper File in case of FULL Load def TabSampleAPIClearData(): if _gv_SampleAPIIncDate == None: try: _gv_TabHyperCon.catalog.detach_all_databases() _gv_TabHyperCon.catalog.drop_database_if_exists(_gv_OutputHyperFile) _gv_TabHyperCon.catalog.create_database_if_not_exists(_gv_OutputHyperFile) _gv_TabHyperCon.catalog.attach_database(_gv_OutputHyperFile) except tableauhyperapi.HyperException as ex: LogFileWriter("Error occurred while truncating Hyper File for Full Load. Please check below error message and try again.") LogFileWriter(str(ex)) raise Exception("Error occurred while truncating Hyper File for Full Load") #Function for writing the data to Output Hyper File def TabSampleAPIWriteData(WriteDataSet, Extract_Comment = None): #Define Tables for Load and Stage Table_Name_Load = tableauhyperapi.TableName(_gv_TabHyperLoadSchema, WriteDataSet["table_name"]) Table_Name_Stage = tableauhyperapi.TableName(WriteDataSet["table_name"] + _gv_TabHyperStageTablesSuffix) Table_Definition_Load = None Table_Definition_Stage = None Extract_Comment = " for " + Extract_Comment if Extract_Comment != None else "" #Define Columns for the Tables based on Write Data Set Column_Definition = [] for index in range(0, len(WriteDataSet["columns"])): Column_Definition.append(tableauhyperapi.TableDefinition.Column(WriteDataSet["columns"][index], WriteDataSet["columns_tabdatatypes"][index])) #Create Schema if it does not exists _gv_TabHyperCon.catalog.create_schema_if_not_exists(_gv_TabHyperLoadSchema) #Create Load Table if it does not exists if not(_gv_TabHyperCon.catalog.has_table(Table_Name_Load)): Table_Definition_Load = tableauhyperapi.TableDefinition(Table_Name_Load, Column_Definition, tableauhyperapi.Persistence.PERMANENT) _gv_TabHyperCon.catalog.create_table_if_not_exists(Table_Definition_Load) TabHyperExtractLogWriter(str(Table_Name_Load.name.unescaped), "CREATED", 0, "Table Created") else: Table_Definition_Load = _gv_TabHyperCon.catalog.get_table_definition(Table_Name_Load) Check_Columns_Load = [str(Column.name.unescaped) for Column in Table_Definition_Load.columns] Check_Columns_Load.sort() Check_Columns_DataSet = WriteDataSet["columns"].copy() Check_Columns_DataSet.sort() if not(Check_Columns_Load == Check_Columns_DataSet): LogFileWriter("Columns for Table: " + WriteDataSet["table_name"] + " is not same as in Hyper file. Please check below:") LogFileWriter("Hyper File Columns: " + str([str(Column.name.unescaped) for Column in Table_Definition_Load.columns])) LogFileWriter("Columns from SampleAPI: " + str(WriteDataSet["columns"])) raise Exception("Mismatched column between Hyper File and Sample API Data Schema for table: " + WriteDataSet["table_name"]) #Create Stage Table if it does not exists if not(_gv_TabHyperCon.catalog.has_table(Table_Name_Stage)): Table_Definition_Stage = tableauhyperapi.TableDefinition(Table_Name_Stage, Column_Definition, tableauhyperapi.Persistence.TEMPORARY if not _gv_TabHyperStageUsePersistentTables else tableauhyperapi.Persistence.PERMANENT) _gv_TabHyperCon.catalog.create_table_if_not_exists(Table_Definition_Stage) TabHyperExtractLogWriter(str(Table_Name_Stage.name.unescaped), "CREATED", 0, "Table Created") else: Table_Definition_Stage = _gv_TabHyperCon.catalog.get_table_definition(Table_Name_Stage) #Execute Pre SQL Query if len(WriteDataSet["pre_sql_load"]) != 0: for Execute_Command in WriteDataSet["pre_sql_load"]: try: Execute_Result = _gv_TabHyperCon.execute_command(str(Execute_Command).replace("__table_name__", str(Table_Name_Load))) TabHyperExtractLogWriter(str(Table_Name_Load.name.unescaped), "PRE_SQL_LOAD", Execute_Result, "Executed Pre SQL " + str(WriteDataSet["pre_sql_load"].index(Execute_Command) + 1) + Extract_Comment) except tableauhyperapi.HyperException as ex: LogFileWriter("There are errors in Pre SQL Load for table " + WriteDataSet["table_name"] + ".\n Error in Pre SQL: \n" + str(Execute_Command) + "\nPlease check below for more details.") LogFileWriter(str(ex)) TabHyperProcessControl("shutdown") raise Exception("Errors in writing Hyper File. Please check errors and try again. Exiting...") #Truncate Stage Table Execute_Command = f"TRUNCATE TABLE {Table_Name_Stage}" try: Execute_Result = _gv_TabHyperCon.execute_command(Execute_Command) TabHyperExtractLogWriter(str(Table_Name_Stage.name.unescaped), "TRUNCATE", Execute_Result, "Stage Table Truncated" + Extract_Comment) except tableauhyperapi.HyperException as ex: LogFileWriter("There are errors in truncating Hyper File Stage Table for: " + WriteDataSet["table_name"] + ". Please check below for more details.") LogFileWriter(str(ex)) TabHyperProcessControl("shutdown") raise Exception("Errors in writing Hyper File. Please check errors and try again. Exiting...") #Insert Data into Stage Table with tableauhyperapi.Inserter(_gv_TabHyperCon, Table_Definition_Stage, WriteDataSet["columns"]) as TabInserter: try: TabInserter.add_rows(WriteDataSet["rows"]) TabInserter.execute() TabHyperExtractLogWriter(str(Table_Name_Stage.name.unescaped), "INSERT", len(WriteDataSet["rows"]), "Rows Inserted" + Extract_Comment) except tableauhyperapi.HyperException as ex: LogFileWriter("There are errors in writing data to Hyper File Table: " + WriteDataSet["table_name"] + " in stage. Please check below for more details.") LogFileWriter(str(ex)) TabHyperProcessControl("shutdown") raise Exception("Errors in writing Hyper File. Please check errors and try again. Exiting...") #Merge Insert data Load Table Execute_Command = f"INSERT INTO {Table_Name_Load} (" + ", ".join(WriteDataSet["columns"]) Execute_Command += f") SELECT " + ", ".join(WriteDataSet["columns"]) + f" FROM {Table_Name_Stage}" try: Execute_Result = _gv_TabHyperCon.execute_command(Execute_Command) TabHyperExtractLogWriter(str(Table_Name_Load.name.unescaped), "MERGE_INSERT", Execute_Result, "Merge Load with Stage Table Step 1" + Extract_Comment) except tableauhyperapi.HyperException as ex: LogFileWriter("There are errors in insert merging data to Hyper File Table: " + WriteDataSet["table_name"] + ". Please check below for more details.") LogFileWriter(str(ex)) TabHyperProcessControl("shutdown") raise Exception("Errors in writing Hyper File. Please check errors and try again. Exiting...") #Merge Delete data from Load Table Execute_Command = f"DELETE FROM {Table_Name_Load} DELQ USING (SELECT " Execute_Command += f", ".join(WriteDataSet["primary_column"]) + f", MAX(" + WriteDataSet["insert_date_key"] + f") AS insert_date_key FROM {Table_Name_Load} " Execute_Command += f"GROUP BY " + f", ".join(WriteDataSet["primary_column"]) + f") MAPQ " Execute_Command += f"WHERE DELQ." + f" || {tableauhyperapi.escape_string_literal('-')} || DELQ.".join(WriteDataSet["primary_column"]) Execute_Command += f" = MAPQ." + f" || {tableauhyperapi.escape_string_literal('-')} || MAPQ.".join(WriteDataSet["primary_column"]) + f" AND DELQ." + WriteDataSet["insert_date_key"] + f" <> MAPQ.insert_date_key" try: Execute_Result = _gv_TabHyperCon.execute_command(Execute_Command) TabHyperExtractLogWriter(str(Table_Name_Load.name.unescaped), "MERGE_DELETE", Execute_Result, "Merge Load with Stage Table Step 2" + Extract_Comment) except tableauhyperapi.HyperException as ex: LogFileWriter("There are errors in delete merging data from Hyper File Table: " + WriteDataSet["table_name"] + ". Please check below for more details.") LogFileWriter(str(ex)) TabHyperProcessControl("shutdown") raise Exception("Errors in writing Hyper File. Please check errors and try again. Exiting...") #Execute Post SQL Query if len(WriteDataSet["post_sql_load"]) != 0: for Execute_Command in WriteDataSet["post_sql_load"]: try: Execute_Result = _gv_TabHyperCon.execute_command(str(Execute_Command).replace("__table_name__", str(Table_Name_Load))) TabHyperExtractLogWriter(str(Table_Name_Load.name.unescaped), "POST_SQL_LOAD", Execute_Result, "Executed Post SQL " + str(WriteDataSet["post_sql_load"].index(Execute_Command) + 1) + Extract_Comment) except tableauhyperapi.HyperException as ex: LogFileWriter("There are errors in Post SQL Load for table " + WriteDataSet["table_name"] + ".\n Error in Post SQL: \n" + str(Execute_Command) + "\nPlease check below for more details.") LogFileWriter(str(ex)) TabHyperProcessControl("shutdown") raise Exception("Errors in writing Hyper File. Please check errors and try again. Exiting...") ##Function to get and parse data from Sample API def SampleAPIParseData(SampleAPIObjectType = None): #Start Tableau Hyper Process try: LogFileWriter("Starting Tableau Hyper Process") TabHyperProcessControl("start", _gv_OutputHyperFile) except Exception as ex: LogFileWriter("Error Occurred while starting Tableau Hyper Process. See below error messages for more details.") LogFileWriter(str(ex)) SendEmailMessage() _gv_LogFile.close() exit(1) ##Get Data for the different API Modules from Sample API try: if SampleAPIObjectType == None: LogFileWriter("Starting to Parse Sample API Data for All") TabSampleAPIClearData() if SampleAPIObjectType == "users" or SampleAPIObjectType == None: LogFileWriter("Starting to Parse Sample API Data for Users") SampleAPIJSONS = SampleAPIGetData(_gv_SampleAPIURL + "/users", ("?start_date=" + _gv_SampleAPIIncDate if _gv_SampleAPIIncDate != None else None)) TabSampleAPIWriteData(SampleDataSchema.SampleAPIDecoder(SampleAPIJSONS, "users")) LogFileWriter("Finished Parsing Sample API Data for Users") if SampleAPIObjectType == "resource" or SampleAPIObjectType == None: LogFileWriter("Starting to Parse Sample API Data for Resource") SampleAPIJSONS = SampleAPIGetData(_gv_SampleAPIURL + "/unknown", ("?start_date=" + _gv_SampleAPIIncDate if _gv_SampleAPIIncDate != None else None)) TabSampleAPIWriteData(SampleDataSchema.SampleAPIDecoder(SampleAPIJSONS, "resource")) LogFileWriter("Finished Parsing Sample API Data for Resource") if SampleAPIObjectType == None: LogFileWriter("Finished Parsing Sample API Data for All") except Exception as ex: LogFileWriter("Error parsing Sample API data. Please check below error and try again.") LogFileWriter(str(ex)) raise Exception ("Error parsing Sample API data.") #Stop Tableau Hyper Process finally: try: LogFileWriter("Stopping Tableau Hyper Process") TabHyperProcessControl("shutdown") except Exception as ex: LogFileWriter("Error Occurred while stopping Tableau Hyper Process. See below error messages for more details.") LogFileWriter(str(ex)) SendEmailMessage() _gv_LogFile.close() exit(1) #Main Routine, Execution starts here if __name__ == "__main__": #1 - Parse Arguments try: ParseArguments() except Exception as ex: print("Error Occured while parsing input parameters. Please check the parameters and try again.") print(str(ex)) exit(1) #2 - Backup existing Output File try: if _gv_Backup == True: FileBackup((_gv_OutputHyperFile if _gv_OutputHyperFile != None else _gv_OutputPackageFile), _gv_BackupFilePath) LogFileWriter("Pre-Existing Output File Backup Done") except Exception as ex: LogFileWriter("Error Occurred while backing up Pre-Existing Output file. Please check paths and try again.") LogFileWriter(str(ex)) print("Error Occurred while backing up Pre-Existing Output file. Please check paths and try again.") print(ex) SendEmailMessage() _gv_LogFile.close() exit(1) #3 - Unarchive .tdsx File try: if _gv_OutputHyperFile == None: FileArchiveUnarchive(_gv_OutputPackageFile, "unarchive") LogFileWriter("Unarchived Pre-Existing Output File") except Exception as ex: LogFileWriter("Error Occurred while unarchiving Pre-Existing Output file. Please check paths and try again.") LogFileWriter(str(ex)) print("Error Occurred while unarchiving Pre-Existing Output file. Please check paths and try again.") print(ex) SendEmailMessage() _gv_LogFile.close() exit(1) #4 - Perform Initial Connection try: LogFileWriter("Initiating Connection") SampleAPIInitialConnection(_gv_SampleAPIURL, _gv_SampleAPIKey) except Exception as ex: LogFileWriter("Error Occurred while initiating connection. Please check connection parameters and try again.") LogFileWriter(str(ex)) print("Error Occurred while initiating connection. Please check connection parameters and try again.") print(ex) SendEmailMessage() _gv_LogFile.close() exit(1) #5 - Get and Parse Data from Sample API, write to Output Hyper File try: LogFileWriter("Starting to parse Sample API Data") SampleAPIParseData() except Exception as ex: LogFileWriter("Error Occurred while parsing Sample API Data. See below error messages for more details.") LogFileWriter(str(ex)) SendEmailMessage() _gv_LogFile.close() exit(1) #6 - Archive .tdsx File try: if not (_gv_OutputPackageFile == None and _gv_PackageXMLFile == None): FileArchiveUnarchive((_gv_OutputPackageFile if _gv_OutputPackageFile != None else _gv_OutputHyperFile), "archive") LogFileWriter("Archived Pre-Existing Output File") except Exception as ex: LogFileWriter("Error Occurred while archiving Pre-Existing Output file. Please check paths and try again.") LogFileWriter(str(ex)) print("Error Occurred while archiving Pre-Existing Output file. Please check paths and try again.") print(ex) SendEmailMessage() _gv_LogFile.close() exit(1) #End of Execution, Send Email and Close Log File LogFileWriter("Finished running the Script.\n") print("\nFinished running the Script.\nPlease check " + ("Log File: " + _gv_LogFilePath if _gv_LogFilePath != None else "above screen messages") + (" and Extract Log Table: " + _gv_TabHyperExtractLogTable + " in the Output file" if _gv_TabHyperExtractLogTable != None else "") + " for more details.\n") SendEmailMessage() _gv_LogFile.close() |
Now lot of things are happening in this file. The best way to analyze is by looking at the __main__ routine in the very end of the script (line 652)
What we are doing here is
- Parsing the input arguments
- Backing up the existing output file
- Unarchiving the .tdsx file if it was specified as output
- Performing Initial Connection to Cloud API
- Thereafter getting the data from Cloud API and parsing it, to store it into .hyper file format
- Archiving the .hyper file into .tdsx file if it was specified as output
- Sending Email and completing Execution
Each of these steps above make individual function call to the code above. The output log generated is based on the processing of same steps as well. Explaining all of these functions and steps in detail would be tedious task and would be impractical so it is out of scope for this blog .
The code includes comment in most places to help you walk through, but here below are the most important functions which you need to modify to make them work with your API.
- Modify the Variables after Global Variables in the top part of script at Line 37 to define basic inputs which would stay constant across script runs.
- Modify Input Arguments as required in ParseArguments function at Line 74 to match to your required inputs.
- Modify Initial Connection Function SampleAPIInitialConnection at Line 319 to have the routine for initial connection to your Cloud API (could be any simple test).
- Modify logic for getting next pages at Line 337 and in SampleAPIGetDataNextPage function at Line 353 if your API uses pagination.
- Modify URL format and other parameters sent in the actual request to the API in SampleAPIGetData function at Line 367.
- This is a big one and most important - Add and Update If conditions in SampleAPIParseData function at Line 603 to conditionally send requests for different API modules and call equivalent parsing functions from SampleDataSchema.py file imported at Line 22.
--> Let's look at SampleDataSchema.py
Warning! This looks simple here from Sample API but takes the most time when doing real world requirement, in defining each field for the JSON received from the API!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | #Basic Imports import datetime #Import for defining data types for Output Hyper File from tableauhyperapi import SqlType #Function for Base Data Set Definition def SampleAPIDataSetBaseDef(): return {"table_name":"", "columns":[], "columns_tabdatatypes": [], "primary_column": [], "insert_date_key": "insert_date" , "pre_sql_load": [], "post_sql_load": [], "rows": []} ##Function for identifying decoding function to be used based on Object Type passed def SampleAPIDecoder(SampleAPIJSONS, SampleAPIObjectType): if SampleAPIObjectType == "users": return SampleAPIDecoderUsers(SampleAPIJSONS) if SampleAPIObjectType == "resource": return SampleAPIDecoderResource(SampleAPIJSONS) ##Decoding Function - Users def SampleAPIDecoderUsers(SampleAPIJSONS): DataSet = SampleAPIDataSetBaseDef() DataSet["table_name"] = "Users" DataSet["primary_column"].append("user_id") Insert_Date = datetime.datetime.now() #Define Columns DataSet["columns"].append("user_id") DataSet["columns"].append("first_name") DataSet["columns"].append("last_name") DataSet["columns"].append("email") DataSet["columns"].append("avatar_url") DataSet["columns"].append("insert_date") #Define Column Tableau Data Types in same order as Columns above DataSet["columns_tabdatatypes"].append(SqlType.varchar(20)) #"user_id" DataSet["columns_tabdatatypes"].append(SqlType.varchar(50)) #"first_name" DataSet["columns_tabdatatypes"].append(SqlType.varchar(50)) #"last_name" DataSet["columns_tabdatatypes"].append(SqlType.varchar(100)) #"email" DataSet["columns_tabdatatypes"].append(SqlType.text()) #"avatar_url" DataSet["columns_tabdatatypes"].append(SqlType.timestamp()) #"insert_date" #Define Data rows w.r.t Data parsed from SampleAPIJSONS in same order as Columns above for SampleAPIJSON in SampleAPIJSONS: for SampleAPIData in SampleAPIJSON["data"]: Row = [] Row.append(str(SampleAPIData["id"])) #"user_id" Row.append(SampleAPIData.get("first_name")) #"first_name" Row.append(SampleAPIData.get("last_name")) #"last_name" Row.append(SampleAPIData.get("email")) #"email" Row.append(SampleAPIData.get("avatar")) #"avatar_url" Row.append(Insert_Date) #"insert_date" DataSet["rows"].append(Row) #Post SQL Execute_Command = "DELETE FROM __table_name__ DELQ USING " Execute_Command += "(SELECT MAX(ctid) as ctid, user_id, insert_date FROM __table_name__ GROUP BY user_id, insert_date) MAPQ " Execute_Command += "WHERE DELQ.user_id = MAPQ.user_id AND DELQ.insert_date = MAPQ.insert_date AND DELQ.ctid <> MAPQ.ctid" DataSet["post_sql_load"].append(Execute_Command) #Return the DataSet return DataSet ##Decoding Function - Resource def SampleAPIDecoderResource(SampleAPIJSONS): DataSet = SampleAPIDataSetBaseDef() DataSet["table_name"] = "Resource" DataSet["primary_column"].append("resource_id") Insert_Date = datetime.datetime.now() #Define Columns DataSet["columns"].append("resource_id") DataSet["columns"].append("resource_name") DataSet["columns"].append("year") DataSet["columns"].append("color") DataSet["columns"].append("pantone_value") DataSet["columns"].append("insert_date") #Define Column Tableau Data Types in same order as Columns above DataSet["columns_tabdatatypes"].append(SqlType.varchar(20)) #"resource_id" DataSet["columns_tabdatatypes"].append(SqlType.varchar(50)) #"resource_name" DataSet["columns_tabdatatypes"].append(SqlType.int()) #"year" DataSet["columns_tabdatatypes"].append(SqlType.varchar(20)) #"color" DataSet["columns_tabdatatypes"].append(SqlType.varchar(20)) #"pantone_value" DataSet["columns_tabdatatypes"].append(SqlType.timestamp()) #"insert_date" #Define Data rows w.r.t Data parsed from SampleAPIJSONS in same order as Columns above for SampleAPIJSON in SampleAPIJSONS: for SampleAPIData in SampleAPIJSON["data"]: Row = [] Row.append(str(SampleAPIData["id"])) #"resource_id" Row.append(SampleAPIData.get("name")) #"resource_name" Row.append(SampleAPIData.get("year")) #"year" Row.append(SampleAPIData.get("color")) #"color" Row.append(SampleAPIData.get("pantone_value")) #"pantone_value" Row.append(Insert_Date) #"insert_date" DataSet["rows"].append(Row) #Return the DataSet return DataSet |
Now in here you are defining the base data structure for an object in the form of a Python Dictionary which would become a table in Tableau, defining a Decoder function which would be entry point for calling actual parsing functions from Main.py and then defining the actual parsing functions.
What should be really concerning here is:
- Defining the entry points for parsing functions in the Decoder function, so this could be called from the Main.py script
- Defining the actual parsing functions. These has below areas to be considered.
- Provide Table Name and define Primary Column(s) for the going to be table(s).
- Define the various Column Names first
- Define the Data Type for these Columns in the same order as above
- Define how to parse the actual JSON object and assign values to the Columns in the same order as above
- Define any Pre/Post SQL(s) if required for processing before/after the data load happens for the table.
- You could also use Faker library here as given in SampleDataSchema_Fake.py script to randomize data input for various Columns if needed.
--> SampleDataSchema_Fake.py code
Warning! No Warnings here!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | #Basic Imports import datetime #Import for defining data types for Output Hyper File from tableauhyperapi import SqlType #Import Faker for generating random data from faker import Faker from faker.providers import address from faker.providers import company from faker.providers import internet from faker.providers import person from faker.providers import phone_number from faker.providers import profile from faker.providers import lorem #Function for Base Data Set Definition def SampleAPIDataSetBaseDef(): return {"table_name":"", "columns":[], "columns_tabdatatypes": [], "primary_column": [], "insert_date_key": "insert_date" , "pre_sql_load": [], "post_sql_load": [], "rows": []} ##Function for identifying decoding function to be used based on Object Type passed def SampleAPIDecoder(SampleAPIJSONS, SampleAPIObjectType): if SampleAPIObjectType == "users": return SampleAPIDecoderUsers(SampleAPIJSONS) if SampleAPIObjectType == "resource": return SampleAPIDecoderResource(SampleAPIJSONS) ##Decoding Function - Users def SampleAPIDecoderUsers(SampleAPIJSONS): DataSet = SampleAPIDataSetBaseDef() DataSet["table_name"] = "Users" DataSet["primary_column"].append("user_id") Insert_Date = datetime.datetime.now() #Define Columns DataSet["columns"].append("user_id") DataSet["columns"].append("first_name") DataSet["columns"].append("last_name") DataSet["columns"].append("email") DataSet["columns"].append("avatar_url") DataSet["columns"].append("insert_date") #Define Column Tableau Data Types in same order as Columns above DataSet["columns_tabdatatypes"].append(SqlType.varchar(20)) #"user_id" DataSet["columns_tabdatatypes"].append(SqlType.varchar(50)) #"first_name" DataSet["columns_tabdatatypes"].append(SqlType.varchar(50)) #"last_name" DataSet["columns_tabdatatypes"].append(SqlType.varchar(100)) #"email" DataSet["columns_tabdatatypes"].append(SqlType.text()) #"avatar_url" DataSet["columns_tabdatatypes"].append(SqlType.timestamp()) #"insert_date" fake = Faker() fake.add_provider(address) fake.add_provider(company) fake.add_provider(internet) fake.add_provider(person) fake.add_provider(phone_number) fake.add_provider(profile) #Define Data rows w.r.t Data parsed from SampleAPIJSONS in same order as Columns above for SampleAPIJSON in SampleAPIJSONS: for SampleAPIData in SampleAPIJSON["data"]: Row = [] Row.append(str(SampleAPIData["id"])) #"user_id" Row.append(fake.first_name()) #"first_name" Row.append(fake.last_name()) #"last_name" Row.append(fake.email()) #"email" Row.append(fake.uri()) #"avatar_url" Row.append(Insert_Date) #"insert_date" DataSet["rows"].append(Row) #Post SQL Execute_Command = "DELETE FROM __table_name__ DELQ USING " Execute_Command += "(SELECT MAX(ctid) as ctid, user_id, insert_date FROM __table_name__ GROUP BY user_id, insert_date) MAPQ " Execute_Command += "WHERE DELQ.user_id = MAPQ.user_id AND DELQ.insert_date = MAPQ.insert_date AND DELQ.ctid <> MAPQ.ctid" DataSet["post_sql_load"].append(Execute_Command) #Return the DataSet return DataSet ##Decoding Function - Resource def SampleAPIDecoderResource(SampleAPIJSONS): DataSet = SampleAPIDataSetBaseDef() DataSet["table_name"] = "Resource" DataSet["primary_column"].append("resource_id") Insert_Date = datetime.datetime.now() #Define Columns DataSet["columns"].append("resource_id") DataSet["columns"].append("resource_name") DataSet["columns"].append("year") DataSet["columns"].append("color") DataSet["columns"].append("pantone_value") DataSet["columns"].append("insert_date") #Define Column Tableau Data Types in same order as Columns above DataSet["columns_tabdatatypes"].append(SqlType.varchar(20)) #"resource_id" DataSet["columns_tabdatatypes"].append(SqlType.varchar(50)) #"resource_name" DataSet["columns_tabdatatypes"].append(SqlType.int()) #"year" DataSet["columns_tabdatatypes"].append(SqlType.varchar(20)) #"color" DataSet["columns_tabdatatypes"].append(SqlType.varchar(20)) #"pantone_value" DataSet["columns_tabdatatypes"].append(SqlType.timestamp()) #"insert_date" fake = Faker() fake.add_provider(address) fake.add_provider(company) fake.add_provider(internet) fake.add_provider(person) fake.add_provider(phone_number) fake.add_provider(profile) #Define Data rows w.r.t Data parsed from SampleAPIJSONS in same order as Columns above for SampleAPIJSON in SampleAPIJSONS: for SampleAPIData in SampleAPIJSON["data"]: Row = [] Row.append(str(SampleAPIData["id"])) #"resource_id" Row.append(fake.company()) #"resource_name" Row.append(SampleAPIData.get("year")) #"year" Row.append(str(fake.random_number())) #"color" Row.append(str(fake.random_number())) #"pantone_value" Row.append(Insert_Date) #"insert_date" DataSet["rows"].append(Row) #Return the DataSet return DataSet |
and lastly....
--> P2H_Extract_and_Publish.bat
This code should be self-explanatory and it uses tabcmd for doing the uploading to Tableau Server.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | @ECHO OFF REM All Paths without quotes and trailing Back Slash SET PYTHON_PATH= SET SCRIPTS_PATH= SET EXTRACT_PATH= SET LOG_PATH= SET BACKUP_PATH= SET INCPERIOD=LastWeek REM Comma (, ) Seperated Values for Emails Below SET EMAIL_TO= REM Tableau Server Details without quotes SET TAB_SERVER= SET TAB_PROJECT= SET TAB_USER= SET TAB_PASSWORD= REM Input Parameters SET EXTRACT_FILE_NAME=%1 SET CTMAK=%2 SET CTMSK=%3 SET LOAD_TYPE=%4 IF [%1]==[] GOTO EXIT IF [%2]==[] GOTO EXIT IF [%3]==[] GOTO EXIT IF [%4]==[] GOTO EXIT FOR /F "usebackq tokens=1,2 delims=." %%A IN (`ECHO %EXTRACT_FILE_NAME%`) DO SET FILE_NAME=%%A FOR /F "usebackq tokens=1,2 delims=:" %%a IN (`ECHO %SCRIPTS_PATH%`) DO SET SCRIPTS_DRIVE=%%a: IF EXIST "%EXTRACT_PATH%\%1" ( SET BACKUP=--backup yes --backuppath "%BACKUP_PATH%" ) ELSE ( SET BACKUP=--backup no ) IF %LOAD_TYPE%==INC ( SET LOAD=--load INC --incperiod %INCPERIOD% ) ELSE ( SET LOAD=--load FULL ) %SCRIPTS_DRIVE% cd %SCRIPTS_PATH% ECHO.>> %LOG_PATH%\log_schedule_%FILE_NAME%.txt ECHO.>> %LOG_PATH%\log_schedule_%FILE_NAME%.txt ECHO ---------------------------------------------------- >> %LOG_PATH%\log_schedule_%FILE_NAME%.txt ECHO Schedule Log %DATE% %TIME% >> %LOG_PATH%\log_schedule_%FILE_NAME%.txt ECHO ---------------------------------------------------- >> %LOG_PATH%\log_schedule_%FILE_NAME%.txt ECHO %PYTHON_PATH%\python.exe Main.py --ctmak %CTMAK% --ctmsk %CTMSK% --output "%EXTRACT_PATH%\%EXTRACT_FILE_NAME%" --log "%LOG_PATH%\log_%FILE_NAME%.txt" --emailto "%EMAIL_TO%" %LOAD% %BACKUP% >> %LOG_PATH%\log_schedule_%FILE_NAME%.txt >> %LOG_PATH%\log_schedule_%FILE_NAME%.txt %PYTHON_PATH%\python.exe Main.py --ctmak %CTMAK% --ctmsk %CTMSK% --output "%EXTRACT_PATH%\%EXTRACT_FILE_NAME%" --log "%LOG_PATH%\log_%FILE_NAME%.txt" --emailto "%EMAIL_TO%" %LOAD% %BACKUP% >> %LOG_PATH%\log_schedule_%FILE_NAME%.txt ECHO. >> %LOG_PATH%\log_schedule_%FILE_NAME%.txt ECHO tabcmd publish "%EXTRACT_PATH%\%EXTRACT_FILE_NAME%" --name "CTM_%FILE_NAME%_DS" --project "%TAB_PROJECT%" --overwrite --server %TAB_SERVER% --username %TAB_USER% --password %TAB_PASSWORD% >> %LOG_PATH%\log_schedule_%FILE_NAME%.txt >> %LOG_PATH%\log_schedule_%FILE_NAME%.txt tabcmd publish "%EXTRACT_PATH%\%EXTRACT_FILE_NAME%" --name "CTM_%FILE_NAME%_DS" --project "%TAB_PROJECT%" --overwrite --server %TAB_SERVER% --username %TAB_USER% --password %TAB_PASSWORD% >> %LOG_PATH%\log_schedule_%FILE_NAME%.txt GOTO EOF :EXIT ECHO Invalid Input Parameters :EOF ECHO.>> %LOG_PATH%\log_schedule_%FILE_NAME%.txt |
This script is also available inside the attached Sample Code.
So what do you say folks? Would this be of some help, I would be happy to learn if you were able to use this somewhere in your projects and I saved some of your efforts while doing your requirement.
As always, happy coding and have a great time getting your data from Cloud APIs into Tableau and analyzing it!
No comments:
Post a Comment