A company that provides business intelligence and data analytics services to clients needs to build a comprehensive data model in the Snowflake to use with AI-driven software developed in-house and available to the customers as a service. The data model aggregates the data elements from various web services and relational databases. The expected traffic is hundreds of gigabytes per day (uncompressed).
- The solution must be able to connect to the various web services which are using a SOAP-style authentication with an electronic signature.
- The solution must be able to copy uncompressed datasets returned by the web services as files into the local storage, compress them using gzip algorithm, and move them to the bucket in Amazon S3.
- The solution must be able to execute a complex multi-step SQL that loads the files into the Snowflake, cleanses the data, and updates the data model.
- The solution must be able to support a very high load (hundreds of gigabytes per day) and must be able to finish in under three hours.
The solution developed by the team uses a parallel database loop to iterate through the available API endpoints (up to a few million) and copy the responses to the Amazon S3 bucket while simultaneously compressing them using gzip algorithm. The available API endpoints, as well as other parameters, are stored in the Snowflake table and maintained outside of the data integration pipeline.
A Flow that creates the electronically signed header parameters for the HTTP Connection
Step 2. Add the following code:
var props = SystemConfig.instance().getContextProperties();
var vendorUserId = "the user id";
var vendorSecretKey = "the secret key";
var endpoint_path = scenario.getVariable("endpoint_path").getValue();
var endpoint = scenario.getVariable("endpoint").getValue();
var endpoint_url = endpoint_path + scenario.getVariable("endpoint").getValue();
var request = scenario.getVariable("request").getValue();
var df = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z");
var requestTimestamp = df.format(new java.util.Date());
var encryptString = java.lang.String.join("\n", endpoint_url,
var secretKey = new SecretKeySpec(new
var mac = Mac.getInstance("HmacSHA1");
var rawHmac = mac.doFinal(new java.lang.String(encryptString).getBytes("UTF-8"));
var signedEncoded = Base64.encodeBytes(rawHmac);
signedEncoded = new java.lang.String(signedEncoded)
var authHeader = "RTK " + vendorUserId + ":" + signedEncoded;
A Flow that copies the response from the web service into Amazon S3 bucket
Step 1. Create an HTTP Connection for the Rentak API with the following parameters:
- Content Type Header:
Step 2. Create a Connection to the Amazon S3 and set the parameter
Archive file before copying to S3 to GZip.
Step 3. Create a Copy Files Flow and add a new transformation with the following parameters:
- Connection (
FROM): an HTTP Connection created in Step 1.
- Connection (
To): an Amazon S3 Connection created in Step 2.
Copy Files Flow created in Step 3 (in this order).
Step 5. Name and save the nested Flow.
A Flow that updates the data model using SQL
Step 2. Create an SQL Flow. Add the SQL, which updates the data model. Example:
COPY INTO STAGING.TABLE (FILENAME, DATA)
INSERT INTO PUBLIC.TABLE
SELECT fields FROM STAGING.TABLE;
All together now
This Flow loops through the records in the configuration table, calls the nested Flow, which copies a file to the S3, and calls the SQL Flow to update the data model.
Step 1. Create a nested Flow.
Step 2. Add the nested
Copy Files Flow and the SQL Flow as a step 1 and step 2, respectively.
Step 3. For Step 1 (a nested
Copy Files Flow), click the
pen button and specify the loop conditions as the following:
Loop Type: SQL
Connection: a Snowflake Connection
SELECT ENDPOINT AS endpoint,
FILENAME AS filename,
ENDPOINT_PATH as endpoint_path,
REQUEST AS request
Notice the fields selected from the configuration table. The names are the same as used across all Flows as
Loop Threads: 20. Read about multithreaded loops.
Step 4. Save the Flow and schedule it to run once a day.