Overview
An automotive company that is selling cars using different platforms, from auctions to dealerships, needs to accumulate the sales transactions in the cloud-based data warehouse (Snowflake).
Requirements
- The solution must be able to extract the data from various sources, such as MS SQL Server and the files in the different Formats uploaded to the various storages (FTP, SFTP, FTPS, etc.), and load it in Snowflake data warehouse.
- The Flows should be able to parse different text Formats and convert them to the Formats understood by the Snowflake.
- The Flows should be able to replace existing data with the new data.
- The number of extracted and loaded records can be as high as 10 million per day.
- There will be multiple Flows, one per data source, running as often as every few minutes.
Prerequisites
- The S3 stage must be created in Snowflake.
Solution
The solution is based on the set of Snowflake-optimized Flows, available in Etlworks. In this tutorial we will cover two use-cases:
- Load data from SQL Server database into Snowflake
- Load data from files in SFTP into Snowflake
Load data from SQL Server database into Snowflake
Step 1. Create a database Connection to the SQL Server database.
Step 2. Create a Connection to the Amazon S3 bucket, which is used as a Snowalfke state for the files to upload.
Step 3. Create a CSV Format with all default settings.
Step 4. Create a database Connection to the Snowflake.
Step 5. Create a new Flow by selecting Database to Snowflake
in the gallery.
Step 6. Add a new source-to-destination transformation with the following parameters:
- Connection (
FROM
): the MS SQL Server Connection created in Step 1. FROM
: the name of the source table in SQL Server.TO
: the Snowflake table name.- Connection (
TO
): the Amazon S3 Connection created in Step 2. - Format (
TO
): the CSV Format created in Step 3.
Step 7. Select the Connections
tab and select the Snowflake Connection created in Step 4.
Step 8. Click MAPPING
and specify the Source query
if needed.
Step 9. Select the Parameters
tab and specify the following parameters for the transformation:
Staging area name
: the Snowflake S3 stage (see prerequisites)Force Loading File(s)
: selectPurge File if Success
: selectPurge File(s) if Error
: selectBefore COPY INTO SQL
: the SQL to execute on Snowflake Connection to delete previously loaded data. In this particular case:-
delete from SALES.PUBLIC.SALES_TRANSACTIONS
where DATA_SOURCE = 'P&A'
Step 10. Save and Schedule the Flow to run once a day.
Load data from files in SFTP into Snowflake
Files can (and in most cases should) be ingested into the Snowflake directly, without any transformation. Etlworks fully supports this scenario using the Flow-type Load files into Snowflake. However, in this Flow, we will be loading data into the staging Snowflake table before loading it into the final destination: sales_transactions
table. The reasons are:
- The source files have a slightly different Format (less number of fields) than the
sales_transactions
table and the different encoding of the date fields, so the transformation is required. - To avoid duplicates, we must delete the existing data in the
sales_transactions
table if the same data exists in a file to upload, hence the use of the staging table.
We will let the Flow automatically create the sales_stage
table when loading the data from the source files.
Step 1. Create an SFTP Connection for the source files.
Step 2. Create a secondary SFTP Connection for the processed files. The Connection must be pointed to a different folder, for example, archive
.
Step 3. We will be reusing the Amazon S3 Connection created for the previous Flow.
Step 4. We will be reusing the Snowflake Connection created for the previous Flow.
Step 5. Our source CSV file has the following properties:
- There are empty rows, encoded as the strings of delimiters:
,,,,,,...,,,,
- The file itself is encoded using UTF-8
- The date fields in the file are encoded as
dd/MM/yyyy HH:mm:ss
- There are some columns in the file which are not compatible with SQL, for example,
:DATE
Hence, create a CSV Format with the following settings:
- Column names compatible with SQL: selected
- Skip empty rows: selected
- Date and Time Format: dd/MM/yyyy HH:mm:ss
- Date Format: dd/MM/yyyy
- Encoding: UTF8
Step 6. Create a CSV Format in Snowflake which uses dd/MM/yyyy HH:mm:ss
for timestamp fields.
Step 7. Create a new Flow by selecting File to Snowflake
in the gallery.
Step 8. Add a new source-to-destination transformation with the following parameters:
- Connection (
FROM
): the SFTP Connection created in Step 1. - Format (
FROM
): the CSV Format created in Step 5. FROM
: the name of the source file in SFTP (you can use a wildcard file name).TO
:sales_stage.csv
.- Connection (
TO
): the Amazon S3 Connection created in Step 3. - Format (
TO
): the CSV Format created in Step 5.
Step 9. Go to the Connections
tab and select the Snowflake Connection created in Step 4.
Step 10. Click MAPPING
, select the Parameters
tab, and specify the following parameters for the transformation:
Process all files
: selectedSnowflake Table Name
: the destination staging Snowflake table name, which issales_stage
File format name
: the Snowflake Format created in Step 6Staging area name
: the Snowflake S3 stage (see prerequisites)Force Loading File(s)
: selectPurge File if Success
: selectPurge File(s) if Error
: selectName(s) of the File(s) to Purge
:sales_stage*.csv
Before COPY INTO SQL
: the SQL to execute on the Snowflake Connection to delete previously loaded data. In this particular case:-
truncate table SALES.PUBLIC.SALES_STAGE
Ignore errors when executing Before COPY INTO SQL
: selected (in case of staging table does not exist)After COPY INTO SQL
: the SQL to copy data from the staging table to the sales table. In this particular case:-
delete from SALES.PUBLIC.SALES_TRANSACTIONS
where transaction_id in (select transaction_id from SALES_STAGE); insert into SALES.PUBLIC.SALES_TRANSACTIONS select fields from SALES.PUBLIC.SALES_STAGE;
Step 11. Save the Flow.
Step 12. Create a new Move Files Flow to move processed files to the archive folder. Add a new transformation and set the following parameters:
- Connection (
FROM
): an SFTP Connection created in Step 1. FROM
: a filename or a wildcard filename of the files to load.TO
:*
.- Connection (
TO
): an SFTP Connection created in Step 2.
Step 13. Create a nested Flow and add the Flow which loads files in Snowflake as step 1 and the Flow which moves files to the archive folder as Step 2.
Step 14. Save and Schedule the Flow to run five times a day.
Comments
0 comments
Please sign in to leave a comment.