Overview
Snowflake is a column-based relational database. Therefore, you can use the same techniques you would normally use to work with relational databases in Etlworks Integrator. It is, however, important to understand that inserting data into Snowflake row by row can be painfully slow.
It is recommended that you use Snowflake-optimized flow to load data in Snowflake.
Using Snowflake-optimized flows you can extract data from any of the supported sources, transform, and load it directly into Snowflake.
A typical Snowflake flow performs the following operations:
- Extracts data from the source.
- Creates CSV, JSON. Avro, or Parquet files.
- Compresses files using the gzip algorithm.
- Copies files into Snowflake stage (local file system, Azure Blob, or Amazon S3).
- Checks to see if the destination Snowflake table exists, and if it does not - creates the table using metadata from the source.
- Dynamically generates and executes the Snowflake COPY INTO command.
- Cleans up the remaining files, if needed.
Prerequisites
- The Snowflake instance is up and running.
- The named Snowflake stage is created in Snowflake.
- The Amazon S3 bucket or Azure blob is created. Alternatively, you can use a folder in server storage as a local stage.
Step-by-step instruction
Step 1. Create all required connections.
You will need a source connection, a connection for the stage (Amazon S3, Azure Blob or Server Storage) and a Snowflake connection. When creating a Snowflake connection set the Stage name. Alternatively, you can configure the Stage name at the transformation level.
when configuring a connection for Amazon S3 or Azure Storage, which will be used as a stage area for the Snowflake flows, it is recommended that you select GZip
as the value for the Archive file before copying field:
Step 2. Create a data exchange format
Snowflake can load data from CSV, JSON and Avro formats so you will need to create one of these and set it as a destination format.
if you are using CSV format for loading large datasets into the Snowflake, consider configuring a format to split the document into smaller files: Snowflake can load files in parallel, also transferring smaller files over the network can be faster.
Step 3. Create a flow to load data in Snowflake
Start creating Snowflake flows by opening the Flows window, clicking the +
button, and typing snowflake
into the search field:
Continue by selecting the flow type, adding source-to-destination transformations and entering the transformation parameters:
Step 4. Set Snowflake connection
For all Snowflake flows, the final destination is Snowflake
.
To configure the final destination, click the Connections tab and select the connection created in Step 1.
Step 5. Set the source and the destination
Depending upon the flow type, you can select one of the following sources (FROM) for the Snowflake flow:
- Database - use the database table or view name as the source (FROM) name
- CDC - use the fully qualified table name as the source (FROM) name
- API - use any appropriate string as the source (FROM) name
- Web Service - use any appropriate string as the source (FROM) name
- File - use the source file name or a wildcard file name as the source (FROM) name
- Queue - use the queue topic name as the source (FROM) name
For all Snowflake flows, the destination connection is going to be either Amazon S3 connection, Azure Storage connection, or server storage. Integrator uses the destination connection as a Snowflake stage.
Select or enter the fully-qualified Snowflake table name as a destination (TO).
Step 6. Set Data Exchange Format configured in step 2
Step 7. Set optional parameters
Click the MAPPING button in the transformation row and select the Parameters tab.
COPY INTO parameters and Action
- Snowflake Table Name - the name of the Snowflake table to load data into. This field is optional and overrides the Snowflake table name set at the transformation level.
- Stage Name - the name of the Snowflake stage. Stage refers to the location where your data files are stored for loading into Snowflake. This field is optional and overrides the Stage name set at the connection level. The stage area name is a part of the
COPY INTO
command. For example:COPY INTO table FROM @{STAGING_AREA_NAME} PATTERN = 'file' FILE_FORMAT = (FORMAT_NAME = CSVFORMAT) PURGE = true
. - Action - can be either
COPY INTO
- inserts records from the file(s) into the Snowflake table orMERGE
- merges records from the file with the records in the Snowflake table. MERGE requires configuring Lookup Fields. - Lookup Fields - the comma-separated list of fields that uniquely identify the record in the target Snowflake table. This field is required if action is set to MERGE.
- - If this option is enabled and Lookup Fields is empty the system will try to predict the columns that uniquely identify the record in the target Snowflake table.
- Use INSERT/DELETE instead of MERGE for CDC MERGE action - if this parameter is enabled (it is disabled by default) and the
CDC MERGE
action is enabled the system will generate and execute 3 SQL statements:- 1) DELETE all rows from the main table which are in the temp CDC stream table;
- 2) INSERT all latest INSERTs and UPDATES from the temp CDC stream table into the main table; 3) DELETE all rows in the main table which are marked for deletion in the temp CDC stream table
- Load SQL - this is a user-defined COPY INTO SQL. By default, Integrator creates COPY INTO SQL automatically, based on the input and output parameters. The auto-generated SQL will look like the following:
COPY INTO {TABLE} FROM @STAGING PATTERN = '{FILE}' FILE_FORMAT = (FORMAT_NAME = {FORMAT}) PURGE = {PURGE}
. You can override it by using this field. Read more about the Snowflake COPY INTO command. - MERGE SQL - this is a user-defined SQL that will be used instead of the default when action is set to
MERGE
. If nothing is entered in this field the default MERGE SQL will be used. The following parameters are automatically populated and can be referenced as {TOKEN} in the SQL:- {TABLE} - the table to MERGE data into,
- {TEMP_TABLE} - the table to merge data from,
- {KEY_FIELDS} - the fields uniquely identifying the record in both tables,
- {FIELDS} - the fields to INSERT/UPDATE in the table to MERGE data into.
- CDC MERGE SQL - this is a user-defined SQL that will be used instead of the default when action is set to
CDC MERGE
. If nothing is entered in this field the default CDC MERGE SQL will be used. The following parameters are automatically populated and can be referenced as {TOKEN} in the SQL:- {TABLE} - the table to MERGE data into,
- {TEMP_TABLE} - the table to merge data from,
- {MERGE_CONDITION} - the conditions to MATCH records in the table to MERGE data into and the table to MERGE data from in the format table.key=temp_table.key,
- {KEY_FIELDS} - the fields uniquely identifying the record in both tables,
- {FIELDS} - the fields to INSERT,
- {INSERT_FIELDS} - the values of the fields to INSERT,
- {UPDATE_FIELDS} - the fields and values to UPDATE in the format field=value,field=value
- Force Loading File(s) - if this option is enabled (it is disabled by default) the COPY INTO command will be generated with a
FORCE=true
option, which will cause reloading the files regardless of whether they have been changed. This option is only used when the COPY INTO command is generated automatically. - Strip Outer Array or Element - For JSON: boolean that instructs the JSON parser to remove outer brackets. For XML: boolean that specifies whether the XML parser strips out the outer XML element, exposing 2nd level elements as separate documents.
- Match By Column Name - String that specifies whether to load semi-structured data (JSON, XML, AVRO) into columns in the target table that match corresponding columns represented in the data. For a column to match, the following criteria must be true:
- 1) The column represented in the data must have the exact same name as the column in the table. The copy option supports case sensitivity for column names. Column order does not matter.
- 2) The column in the table must have a data type that is compatible with the values in the column represented in the data. For example, string, number, and Boolean values can all be loaded into a variant column.
Format
- File Format - the optional format name or definition. Example of the ad-hoc format:
(type = 'CSV' field_delimiter = ',' skip_header = 1)
. Read about Snowflake file formats. the format name or definition is optional. If not provided - the system will automatically create an ad-hoc definition based on the destination format configured for the transformation. - Replace Invalid characters - if enabled, Snowflake replaces invalid UTF-8 characters with the Unicode replacement character.
- Error on Column Mismatch - this option specifies whether to generate a parsing error if the number of delimited columns (i.e. fields) in an input data file does not match the number of columns in the corresponding table. If disabled, an error is not generated and the load continues. If the file is successfully loaded: If the input file contains records with more fields than columns in the table, the matching fields are loaded in order of occurrence in the file and the remaining fields are not loaded. If the input file contains records with fewer fields than columns in the table, the non-matching columns in the table are loaded with NULL values.
- Trim Space - this option specifies whether to remove white space from fields.
- String used to convert to and from SQL NULL - When loading data, Snowflake replaces these strings in the data load source with SQL NULL. To specify more than one string, enclose the list of strings in parentheses and use commas to separate each value. For example:
NULL_IF = ('N', 'NULL', 'NUL', '')
Parameters for copying files into the local stage
- Number of threads to use for uploading files to local stage - Specifies the number of threads to use for uploading files to local stage. The upload process separate batches of data files by size:Small files (< 16 MB compressed or uncompressed) are staged in parallel as individual files. Larger files are automatically split into chunks, staged concurrently and reassembled in the target stage. A single thread can upload multiple chunks.Increasing the number of threads can improve performance when uploading large files.Supported values: Any integer value from 1 (no parallelism) to 99 (use 99 threads for uploading files).
Other parameters
- Truncate Columns - If enabled (disabled by default) the COPY INTO command will be generated with a TRUNCATECOLUMNS =true option. Strings are automatically truncated to the target column length.
- Purge File if Success - if this option is enabled (by default), and the user-defined SQL is not used, Integrator will run the COPY INTO command with the purge option enabled. With purge enabled, the staging file will be automatically deleted after the COPY INTO command is successfully executed.
- Purge File(s) if Error - if this option is enabled (by default) the staging files will be automatically deleted if a COPY INTO command was executed with an error.
- Name(s) of the File(s) to Purge - if "Purge File(s) if Error" is enabled, it is possible to configure which files you want the system to delete if an error occurs while executing the COPY INTO command. Wildcard file names are allowed. If not specified, the system will use the same wildcard file name as it used for the COPY INTO command.
- On Error - what to do in case of any error. Available options are:
ABORT_STATEMENT
(default),CONTINUE
,SKIP_FILE
- Before COPY INTO SQL - this SQL will be executed on the Snowflake connection before running COPY INTO SQL.
- Ignore errors when executing Before COPY INTO SQL - if this option is enabled, and there is an error when "Before COPY INTO SQL" is executed - the error will be ignored.
- After COPY INTO SQL - this SQL will be executed on the Snowflake connection after running COPY INTO SQL.
- Ignore errors when executing After COPY INTO SQL - if this option is enabled, and there is an error when "After COPY INTO SQL" is executed - the error will be ignored.
Parameters specific for different source types
Depending on the flow type, other flow parameters can be added, as explained below:
- The source is a database
- The source is a file or web service
- - if this parameter is enabled the system will copy files directly into the Snowflake stage before executing the COPY INTO command. This greatly improves the performance of the load but automatically disables any transformations.
- - if this parameter is enabled the system will automatically delete loaded files from the source.
- - if this parameter is enabled and there is an error during the load the system will automatically delete loaded files from the source.
- The source is a well-known API
Step 8. Optionally Configure MERGE (UPSERT)
To merge (upsert) existing data in the Snowflake table with new data:
- Set the Action to MERGE.
- Define the Lookup Fields - the comma-separated list of fields that uniquely identify the record in the Snowflake table.
Alternatively, you can enable the Predict Lookup Fields which, if enabled, will force the flow to use various algorithms to automatically predict the fields that uniquely identify the record. Note that it is not always possible to correctly detect the unique fields.
When enabling the Predict Lookup Fields (which is not always accurate) is not an option you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
Step 9. Configure how to handle source and destination schema changes
It is quite typical for the source (for example, the table in the OLTP database) and the destination (Snowflake table) to have a different number of columns. It is also entirely possible that the order of columns in the source is different than the order of columns in the destination. In either of these cases, by default, the flow will fail since the Snowflake COPY INTO command cannot load files that have more or fewer columns than the target table or the order of columns is different.
When setting the source-to-destination transformation, it is possible to configure it to automatically handle schema changes. The following configuration options are available:
- Reorder CSV Columns During Extract - if this option is enabled (it is disabled by default) the flow reorders columns in the data file to match the order of columns in the target Snowflake table. This option is ignored if the target table does not exist.
- Reorder CSV Columns During Load (recommended) - if this option is enabled (it is disabled by default) and the COPY INTO action is selected the flow reorders the column data from a staged CSV file before loading it into a table. Enabling this option will allow loading files that have a different set of columns or columns in a different order than the destination table in Snowflake. In most cases, it is recommended to use this option instead of Reorder CSV Columns During Extract.
- Ignore fields in the source that do not exist in the target table - if this option is enabled (it is disabled by default) and the source has more fields than the target (Snowflake) table, the system will automatically exclude extra fields. Options 3, 4, and 5 are mutually exclusive.
- Alter target table if the source has columns that the target table doesn't have - if this option is enabled (it is disabled by default) and the source has different fields than the target (Snowflake) table, the system will add extra fields to the target table. Options 3, 4, and 5 are mutually exclusive.
- Recreate target table if the source has columns that the target table doesn't have - if this option is enabled (it is disabled by default) and the source has different fields than the target (Snowflake) table, the system will automatically drop and recreate the target table. Options 3, 4, and 5 are mutually exclusive.
- Insert null into the fields in the target table that do not exist in the source - if this option is enabled (it is disabled by default) and the target (Snowflake) table has different fields than the source, the system will automatically insert NULL values into these fields in the target.
Configuring notifications if source and destination have different columns
It is possible to send an email notification if the source and destination have different columns. To configure flow to send a notification when either source has more columns than the destination or the destination has more columns than the source, use the technique explained in this article.
Step 10. Optionally configure Mapping
If necessary, you can create a mapping between the source and destination (Snowflake) fields.
You can create mapping between the source and destination just like you usually do for any other flow type.
Mapping is not required, but please remember that if a source field name is not supported by Snowflake, it will return an error and the data will not be loaded into the database. For example, if you are loading data from Google Analytics, the output (source) is going to include fields with the prefix ga:
( ga:user, ga:browser, etc. ). Unfortunately, Snowflake does not support fields with a :
, so the data will be rejected. If that happens, you can use mapping to rename the destination fields:
Setup incremental Change Replication using high watermark
As in any other flow type, it is possible to configure a change replication using high watermark.
When change replication is enabled, only the changed records will be loaded into Snowflake.
Basically, all you need to do is set the high watermark field and enable change replication for the transformation. Additionally, for better readability, you can set the calculated high watermark field value.
Setup incremental change replication using change data capture (CDC)
Change data capture (CDC) is an approach to data integration that is based on the identification, capture, and delivery of the changes made to the source database and stored in the database redo log (also called transaction log).
Etlworks supports replicating data using CDC from the MySQL, SQL Server, Oracle, Postgres, and MongoDB.
Read about configuring CDC for the source databases:
Once the CDC is configured for the source database you can create a CDC pipeline where the source is one of these databases and the destination is a Snowflake.
When creating a point-to-point CDC pipeline for extracting data from a CDC-enabled database and loading it into the Snowflake you have 2 options.
- Stream CDC events, create CSV files + load data from CSV files into Snowflake.
- Stream CDC events into a message queue + load data from a message queue into Snowflake.
Loading multiple tables by a wildcard name
To load multiple databases objects (tables and views) by a wildcard name (without creating individual source-to-destination transformations) follow the same steps as above, except:
FROM
Create a single source-to-destination transformation and set the FROM to a fully qualified source database object name with wildcard characters (* and ?), for example public.*
By default, all tables and all views that match the wildcard name in FROM will be included. If you want to exclude specific database objects enter a comma-separated list of the database objects to exclude in the Exclude objects field. If you want to include only specific database objects enter a comma-separated list of the database objects to include in the Include objects field.
TO
Set the TO to the SNOWFLAKE_DW.SCHEMA.*
, for example UTIL_DB.PUBLIC.*
.
Read how to programmatically change the destination (TO) name.
EXCLUDE
Optionally configure the list of the database object to exclude.
To exclude all views enter all views
in the Exclude objects field.
To exclude all tables enter all tables
in the Exclude objects field.
INCLUDE
Optionally configure the list of the database object to include.
SOURCE QUERY
Optionally configure the Source query. You can use token {TABLE}
in the source query.
BEFORE/AFTER SQL
If you want to execute any code before and/or after COPY INTO use token {TABLE}
in the Before COPY INTO SQL and After COPY INTO SQL
MERGE
If you are configuring MERGE action do not enter anything in the Lookup Field. Enable Predict Lookup Field instead.
Alternatively to enabling the Predict Lookup Fields option (which is not always accurate) you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
HWM CHANGE REPLICATION
If you are setting up the high watermark change replication with a calculated HWM field use token {TABLE}
in the High Watermark Field Value.
Common issues when loading data in Snowflake
Read how to troubleshoot and fix common issues when loading data in Snowflake.
Comments
0 comments
Please sign in to leave a comment.