Overview
A company that provides business intelligence and data analytics services to clients needs to build a comprehensive data model in the Snowflake to use with AI-driven software developed in-house and available to the customers as a service. The data model aggregates the data elements from various web services and relational databases. The expected traffic is hundreds of gigabytes per day (uncompressed).
Requirements
- The solution must be able to connect to the various web services which are using a SOAP-style authentication with an electronic signature.
- The solution must be able to copy uncompressed datasets returned by the web services as files into the local storage, compress them using gzip algorithm, and move them to the bucket in Amazon S3.
- The solution must be able to execute a complex multi-step SQL that loads the files into the Snowflake, cleanses the data, and updates the data model.
- The solution must be able to support a very high load (hundreds of gigabytes per day) and must be able to finish in under three hours.
Solution
The solution developed by the team uses a parallel database loop to iterate through the available API endpoints (up to a few million) and copy the responses to the Amazon S3 bucket while simultaneously compressing them using gzip algorithm. The available API endpoints, as well as other parameters, are stored in the Snowflake table and maintained outside of the data integration pipeline.
A Flow that creates the electronically signed header parameters for the HTTP Connection
Step 1. Create a JavaScript Flow.
Step 2. Add the following code:
var javaImports = new JavaImporter(java.text, java.util, javax.crypto,
javax.crypto.spec, org.apache.commons.codec.binary);
with (javaImports) {
var props = SystemConfig.instance().getContextProperties();
var vendorUserId = "the user id";
var vendorSecretKey = "the secret key";
var endpoint_path = scenario.getVariable("endpoint_path").getValue();
var endpoint = scenario.getVariable("endpoint").getValue();
var endpoint_url = endpoint_path + scenario.getVariable("endpoint").getValue();
var request = scenario.getVariable("request").getValue();
var df = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z");
df.setTimeZone(TimeZone.getTimeZone("GMT"));
var requestTimestamp = df.format(new java.util.Date());
var encryptString = java.lang.String.join("\n", endpoint_url,
requestTimestamp, request);
var secretKey = new SecretKeySpec(new
java.lang.String(vendorSecretKey).getBytes("UTF-8"), "HmacSHA1");
var mac = Mac.getInstance("HmacSHA1");
mac.init(secretKey);
var rawHmac = mac.doFinal(new java.lang.String(encryptString).getBytes("UTF-8"));
var signedEncoded = Base64.encodeBytes(rawHmac);
signedEncoded = new java.lang.String(signedEncoded)
var authHeader = "RTK " + vendorUserId + ":" + signedEncoded;
props.put('requestSignature', signedEncoded);
props.put('requestTimestamp', requestTimestamp);
props.put('vendorUserId', vendorUserId);
props.put('authHeader', authHeader);
props.put('request_final', request);
props.put('endpoint_path_final', endpoint_path);
props.put('endpoint_final', endpoint);
}
Notice the global variables created at the end of the JavaScript code.
A Flow that copies the response from the web service into Amazon S3 bucket
Step 1. Create an HTTP Connection for the Rentak API with the following parameters:
- URL:
https://api.rentrak.com{endpoint_path_final}{endpoint_final}
- Method:
POST
- Content Type Header:
application/json
- Headers:
-
Authorization={authHeader} Date={requestTimestamp}
- Payload:
{request_final}
Step 2. Create a Connection to the Amazon S3 and set the parameter Archive file before copying to S3
to GZip.
Step 3. Create a Copy Files Flow and add a new transformation with the following parameters:
- Connection (
FROM
): an HTTP Connection created in Step 1. From
:{filename}.json
To
:*
- Connection (
To
): an Amazon S3 Connection created in Step 2.
Step 4. Create a nested Flow and add the JavaScript Flow, which creates the electronically signed header parameters and the Copy Files
Flow created in Step 3 (in this order).
Step 5. Name and save the nested Flow.
A Flow that updates the data model using SQL
Step 1. Create a database Connection for the Snowflake.
Step 2. Create an SQL Flow. Add the SQL, which updates the data model. Example:
COPY INTO STAGING.TABLE (FILENAME, DATA)
parameters;
UPDATE STAGING.TABLE
SET fiels
WHERE condtions;
INSERT INTO PUBLIC.TABLE
SELECT fields FROM STAGING.TABLE;
All together now
This Flow loops through the records in the configuration table, calls the nested Flow, which copies a file to the S3, and calls the SQL Flow to update the data model.
Step 1. Create a nested Flow.
Step 2. Add the nested Copy Files
Flow and the SQL Flow as a step 1 and step 2, respectively.
Step 3. For Step 1 (a nested Copy Files
Flow), click the pen
button and specify the loop conditions as the following:
Loop Type
: SQLConnection
: a Snowflake ConnectionLoop Script
:-
SELECT ENDPOINT AS endpoint,
FILENAME AS filename,
ENDPOINT_PATH as endpoint_path,
REQUEST AS request
FROM CONFIGURATION_TABLE;
Notice the fields selected from the configuration table. The names are the same as used across all Flows as {tokens}
.
Loop Threads
: 20. Read about multithreaded loops.
Step 4. Save the Flow and schedule it to run once a day.
Comments
0 comments
Please sign in to leave a comment.