Problem
An automotive company that is selling cars using different platforms - from auctions to dealerships - needs to accumulate the sales transactions in the cloud-based data warehouse (Snowflake).
Requirements
- The solution must be able to extract the data from the various sources, such as MS SQL Server and the files in the different formats uploaded to the various storages (FTP, SFTP, FTPS, etc.), and load it in Snowflake data warehouse.
- The flows should be able to parse different text formats and convert them to the formats understood by Snowflake.
- The flows should be able to replace existing data with the new data.
- The number of extracted and loaded records can be as high as 10 million per day.
- There will be multiple flows, one per data source, running as often as every few minutes.
Prerequisites
- The S3 stage must be created in Snowflake.
Solution
The solution is based on the set of Snowflake-optimized flows, available in Etlworks. In this tutorial we will cover two use-cases:
- Loading data from SQL Server database into Snowflake
- Loading data from files in SFTP into Snowflake
Loading data from SQL Server database into Snowflake
Step 1. Create a database connection to the SQL Server database.
Step 2. Create a connection to the Amazon S3 bucket, which is used as a Snowalfke state for the files to upload.
Step 3. Create a CSV format with all default settings.
Step 4. Create a database connection to the Snowflake.
Step 5. Create a new flow by selecting the "database to Snowflake" in the gallery.
Step 6. Add a new source-to-destination transformation with the following parameters:
- Connection (from) - the MS SQL Server connection created in Step 1.
- From - the name of the source table in SQL Server
- To - the Snowflake table name
- Connection (to) - the Amazon S3 connection created in Step 2.
- Format (to) - the CSV format created in Step 3.
Step 7. Select the Connections tab and select the Snowflake connection created in Step 4.
Step 8. Click the MAPPING button and specify the Source query if needed.
Step 9. Select the Parameters tab and specify the following parameters for the transformation:
- Staging area name - the Snowflake S3 stage (see prerequisites)
- Force Loading File(s) - select
- Purge File if Success - select
- Purge File(s) if Error - select
- Before COPY INTO SQL - the SQL to execute on Snowflake connection to delete previously loaded data. In this particular case:
-
delete from SALES.PUBLIC.SALES_TRANSACTIONS
where DATA_SOURCE = 'P&A'
Step 10. Save and schedule the flow to run once a day.
Loading data from files in SFTP into Snowflake
Files can (and in most cases should) be ingestion into the Snowflake directly, without any transformation. Etlworks fully supports this scenario using a flow-type Load files into Snowflake. However, in this flow, we will be loading data into the staging Snowflake table before loading into the final destination - sales_transactions table. Reasons:
- The source files have a slightly different format (less number of fields) than the sales_transactions table, and the different encoding of the date fields, so the transformation is required.
- To avoid duplicates, we must delete the existing data in the sales_transactions table if the same data exists in a file to upload, hence the using of the staging table.
We will let the flow automatically create the sales_stage table when loading the data from the source files.
Step 1. Create an SFTP connection for the source files.
Step 2. Create a secondary SFTP connection for the processed files. The connection must be pointed to the different folder, for example, archive
.
Step 3. We will be reusing the Amazon S3 connection created for the previous flow.
Step 4. We will be reusing the Snowflake connection created for the previous flow.
Step 5. Our source CSV file has the following properties:
- There are empty rows, encoded as the strings of delimiters:
,,,,,,...,,,,
- The file itself is encoded using UTF-8
- The date fields in the file are encoded as dd/MM/yyyy HH:mm:ss
- There are some columns in the file which are not compatible with SQL, for example
:DATE
Hence, create a CSV format with the following settings:
- Column names compatible with SQL - selected
- Skip empty rows - selected
- Date and Time Format - dd/MM/yyyy HH:mm:ss
- Date Format - dd/MM/yyyy
- Encoding - UTF8
Step 6. Create a CSV format in Snowflake which uses dd/MM/yyyy HH:mm:ss
for timestamp fields.
Step 7. Create a new flow by selecting the "file to Snowflake" in the gallery.
Step 8. Add a new source-to-destination transformation with the following parameters:
- Connection (from) - the SFTP connection created in Step 1.
- Format (from) - the CSV format created in Step 5.
- From - the name of the source file in SFTP (you can use a wildcard file name).
- To - sales_stage.csv
- Connection (to) - the Amazon S3 connection created in Step 3.
- Format (to) - the CSV format created in Step 5.
Step 9. Select the Connections tab and select the Snowflake connection created in Step 4.
Step 10. Click the MAPPING button, select the Parameters tab and specify the following parameters for the transformation:
- Process all files - selected
- Snowflake Table Name - the destination staging Snowflake table name, which is sales_stage
- File format name - the Snowflake format created in Step 6
- Staging area name - the Snowflake S3 stage (see prerequisites)
- Force Loading File(s) - select
- Purge File if Success - select
- Purge File(s) if Error - select
- Name(s) of the File(s) to Purge - sales_stage*.csv
- Before COPY INTO SQL - the SQL to execute on Snowflake connection to delete previously loaded data. In this particular case:
-
truncate table SALES.PUBLIC.SALES_STAGE
- Ignore errors when executing Before COPY INTO SQL - selected (in case of staging table does not exist)
- After COPY INTO SQL - the SQL to copy data from the staging table to the sales table. In this particular case:
-
delete from SALES.PUBLIC.SALES_TRANSACTIONS
where transaction_id in (select transaction_id from SALES_STAGE); insert into SALES.PUBLIC.SALES_TRANSACTIONS select fields from SALES.PUBLIC.SALES_STAGE;
Step 11. Save the flow.
Step 12. Create a new Move Files flow to move processed files to the archive folder. Add a new transformation and set the following parameters:
- Connection (from) - an SFTP connection created in Step 1.
- From - a filename or a wildcard filename of the files to load.
- To - *.
- Connection (to) - an SFTP connection created in Step 2.
Step 13. Create a nested flow and add the flow which loads files in Snowflake as step 1 and the flow which moves files to the archive folder as Step 2.
Step 14. Save and schedule the flow to run five times a day.
Comments
0 comments
Please sign in to leave a comment.