Overview
Snowflake is a column-based relational database. Therefore, you can use the same techniques you would normally use to work with relational databases in the Etlworks Integrator. It is, however, important to understand that inserting data into Snowflake row by row can be painfully slow.
It is recommended that you use a Snowflake-optimized Flow to ETL data in Snowflake.
Flows optimized for Snowflake
Flow type | When to use | |
|
You are here |
When you need to extract data from any source, transform it and load it into Snowflake. |
Bulk load files into Snowflake | When you need to bulk-load files that already exist in the external Snowflake stage (S3, Azure Blob, GC blob) or in the server storage without applying any transformations. The flow automatically generates the COPY INTO command and MERGEs data into the destination. | |
Stream CDC events into Snowflake | When you need to stream updates from the database which supports Change Data Capture (CDC) into Snowflake in real time. | |
Stream messages from a queue into Snowflake | When you need to stream messages from the message queue which supports streaming into Snowflake in real time. | |
Stream messages from a queue into Snowflake | When you need to stream messages from the message queue which supports streaming into Snowflake in real time. | |
COPY files into Snowflake | When you need to bulk-load data from the file-based or cloud storage, API, or NoSQL database into Snowflake without applying any transformations. This flow requires providing the user-defined COPY INTO command. Unlike Bulk load files into Snowflake, this flow does not support automatic MERGE. |
How it works
Using Snowflake-optimized Flows, you can extract data from any of the supported sources, transform, and load it into Snowflake.
A typical Snowflake Flow performs the following operations:
- Automatically creates a named Snowflake stage if it does not exists.
- Extracts data from the source.
- Creates CSV, JSON, Avro, or Parquet files.
- Compresses files using the gzip algorithm.
- Copies files into Snowflake stage (local file system, Azure Blob, Amazon S3, or Google Cloud storage).
- Checks to see if the destination Snowflake table exists, and if it does not — creates the table using metadata from the source.
- Dynamically generates and executes the Snowflake
COPY INTO
command. - If configured
MERGEs
data in the source with the data in Snowflake. - Cleans up the remaining files, if needed.
Prerequisites
1. The Snowflake data warehouse is active.
2. The Stage name
is set for the Snowflake connection or Transformation (the latter overrides the stage set for the Connection). Etlworks uses the SnowflakeCOPY INTO
command to load data into Snowflake tables.COPY INTO
requires a named internal or external stage. Stage refers to the location where your data files are stored for loading into Snowflake. Read how Etlworks flow automatically creates the named Snowflake stage.
3. For loading data from the external stage in AWS S3, Azure Blob, or Google Cloud Storage, the Amazon S3 bucket, Google Storage bucket, or Azure blob needs to be created. Note that Etlworks flow does not create the bucket or blob.
Step-by-step instruction
Here's how you can extract, transform, and load data in Snowflake:
Step 1. Create all required Connections
You will need a source Connection, a Connection for the stage (Amazon S3, Azure Blob, Google Cloud Storage, or Server Storage), and a Snowflake Connection.
When creating a Snowflake Connection, set the Stage name
. Alternatively, you can configure the Stage name
at the transformation level. Read how Etlworks flow automatically creates the named Snowflake stage.
Depending on how you prefer to stage files with CDC events for loading data into the Snowflake create one of the following connections:
- The Server Storage connection for loading data from the internal Snowflake stage.
- S3 connection for loading data from the S3 external stage.
- Azure storage connection for loading data from the Azure external stage.
- Google Cloud Storage connection for loading data from the Google Cloud external stage.
When configuring a Connection for Amazon S3, Azure Storage, or Google Storage, which will be used as a stage, it is recommended that you select GZip
as the value for the Archive file before copying
field:
Step 2. Create a data exchange Format
Snowflake can load data from CSV, JSON, and Avro Formats so you will need to create one of these and set it as a destination Format.
If you are using a CSV Format for loading large datasets into the Snowflake, consider configuring a Format to split the document into smaller files: Snowflake can load files in parallel, also transferring smaller files over the network can be faster.
Step 3. Create a Flow to load data in Snowflake
Start creating Snowflake Flows by opening the Flows
window, clicking +
, and typing snowflake
into the search field:
Continue by selecting the Flow type (any to Snowflake), adding source-to-destination transformations, and entering the transformation parameters:
Step 4. Set Snowflake Connection
For all Snowflake Flows, the final destination is Snowflake
.
To configure the final destination, click the Connections
tab and select the Snowflake connection created in Step 1.
Step 5. Set the source and the destination
Depending upon the Flow type, you can select one of the following sources (FROM
) for the Snowflake Flow:
-
Database: use the database table or view name as the source (
FROM
) name. The wildcard names, for example,public.*
are supported. -
Well-known API: use the endpoint name or the file name as the source (
FROM
) name. -
Web Service: use the endpoint name as the source (
FROM
) name. -
File: use the source file name or a wildcard file name as the source (
FROM
) name. -
Queue: use the queue name as the source (
FROM
) name.
For all Snowflake Flows, the destination Connection is going to be either Amazon S3 Connection, Azure Storage Connection, Google Cloud Storage, or server storage. The Etlworks Integrator uses the destination Connection as a Snowflake stage.
Select or enter the fully-qualified Snowflake table name as a destination (TO
). If the source (FROM
) is entered using a wildcard template, set the destination (TO
) to a wildcard as well.
Step 6. Set Data Exchange Format configured in step 2
Step 7. Set optional parameters
Click MAPPING
in the transformation row and select the Parameters
tab.
COPY INTO parameters and Action
-
Snowflake Table Name
: the name of the Snowflake table to load data into. This field is optional and overrides the Snowflake table name set at the transformation level. -
Stage Name
: the name of the Snowflake stage. Stage refers to the location where your data files are stored for loading into Snowflake. This field is optional and overrides the Stage name set at the Connection level. The stage area name is a part of theCOPY INTO
command. For example:COPY INTO table FROM @{STAGING_AREA_NAME} PATTERN = 'file' FILE_FORMAT = (FORMAT_NAME = CSVFORMAT) PURGE = true
. -
Action
: can either beCOPY INTO
which inserts records from the file(s) into the Snowflake table orMERGE
which merges records from the file with the records in the Snowflake table.MERGE
requires configuring theLookup Fields
. -
How to MERGE
: the type of SQL to execute when merging data. The following options are available:Snowflake MERGE
(default) - execute native Snowflake MERGE SQL;DELETE/INSERT
- DELETE all records in the actual table that also exist in the temp table, then INSERT all records from the temp table into the actual table. -
Lookup Fields
: the comma-separated list of fields that uniquely identify the record in the target Snowflake table. This field is required if action is set toMERGE
. -
If this option is enabled and
Lookup Fields
is empty, the system will try to predict the columns that uniquely identify the record in the target Snowflake table. -
Use INSERT/DELETE instead of MERGE for CDC MERGE action
: if this parameter is enabled (it is disabled by default) and theCDC MERGE
action is enabled, the system will generate and execute three SQL statements:- 1)
DELETE
all rows from the main table which are in the temp CDC stream table; - 2)
INSERT
all latestINSERTS
andUPDATES
from the temp CDC stream table into the main table; - 3)
DELETE
all rows in the main table which are marked for deletion in the temp CDC stream table
- 1)
-
Load SQL
: this is a user-definedCOPY INTO SQL
. By default, the Etlworks Integrator createsCOPY INTO SQL
automatically, based on the input and output parameters. The auto-generated SQL will look like the following:COPY INTO {TABLE} FROM @STAGING PATTERN = '{FILE}' FILE_FORMAT = (FORMAT_NAME = {FORMAT}) PURGE = {PURGE}
. You can override it by using this field. Read more about the SnowflakeCOPY INTO
command. -
MERGE SQL
: this is a user-defined SQL that will be used instead of the default when action is set toMERGE
. If nothing is entered in this field, the defaultMERGE SQL
will be used. The following parameters are automatically populated and can be referenced as{TOKEN}
in the SQL:-
{TOKEN}
: the table to MERGE data into, -
{TEMO_TABLE}
: the table to merge data from, -
{KEY_FIELDS}
: the fields uniquely identifying the record in both tables, -
{FIELDS
: the fields toINSERT
/UPDATE
in the table toMERGE
data into.
-
-
CDC MERGE SQL
: this is a user-defined SQL that will be used instead of the default when action is set toCDC MERGE
. If nothing is entered in this field, the defaultCDC MERGE SQL
will be used. The following parameters are automatically populated and can be referenced as{TOKEN}
in the SQL:-
{TABLE}
: the table to MERGE data into. -
{TEMP_TABLE}
: the table to merge data from. -
{MERGE_CONDITION}
: the conditions toMATCH
records in the table toMERGE data into
and the table toMERGE data from
in the Formattable.key=temp_table.key
. -
{KEY_FIELDS}
: the fields uniquely identifying the record in both tables. -
{FIELDS}
: the fields toINSERT
. -
{INSERT_FIELDS}
: the values of the fields toINSERT
. -
{UPDATE_FIELDS}
: the fields and values toUPDATE
in the Formatfield=value,field=value
-
-
Force Loading File(s)
: if this option is enabled (it is disabled by default), theCOPY INTO
command will be generated with aFORCE=true
option, which will cause reloading the files regardless of whether they have been changed. This option is only used when theCOPY INTO
command is generated automatically. -
Strip Outer Array or Element
: For JSON: boolean that instructs the JSON parser to remove outer brackets. For XML: boolean that specifies whether the XML parser strips out the outer XML element, exposing 2nd level elements as separate documents. -
Match By Column Name
: String that specifies whether to load semi-structured data (JSON, XML, AVRO) into columns in the target table that match corresponding columns represented in the data. For a column to match, the following criteria must be true:- The column represented in the data must have the exact same name as the column in the table. The copy option supports case sensitivity for column names. Column order does not matter.
- The column in the table must have a data type that is compatible with the values in the column represented in the data. For example, string, number, and Boolean values can all be loaded into a variant column.
-
Binary format
: this parameter defines the encoding format for binary input or output. This option only applies when loading data into binary columns in a table. Default is HEX.
Format
-
File Format
: the optional Format name or definition. Example of the ad-hoc Format:(type = 'CSV' field_delimiter = ',' skip_header = 1)
. Read about Snowflake file Formats. The Format name or definition is optional. If not provided, the system will automatically create an ad-hoc definition based on the destination Format configured for the transformation. -
Replace Invalid characters
: if enabled, Snowflake replaces invalid UTF-8 characters with the Unicode replacement character. -
Error on Column Mismatch
: this option specifies whether to generate a parsing error if the number of delimited columns (i.e. fields) in an input data file does not match the number of columns in the corresponding table. If disabled, an error is not generated and the load continues. If the file is successfully loaded: If the input file contains records with more fields than columns in the table, the matching fields are loaded in order of occurrence in the file and the remaining fields are not loaded. If the input file contains records with fewer fields than columns in the table, the non-matching columns in the table are loaded withNULL
values. -
Trim Space
: this option specifies whether to remove white space from fields. -
String used to convert to and from SQL NULL
: When loading data, Snowflake replaces these strings in the data load source withSQL NULL
. To specify more than one string, enclose the list of strings in parentheses and use commas to separate each value. For example:NULL_IF = ('N', 'NULL', 'NUL', '')
.
Parameters for copying files into the local stage
-
Number of threads to use for uploading files to local stage
: Specifies the number of threads to use for uploading files to the local stage. The upload process separates batches of data files by size: Small files (< 16 MB compressed or uncompressed) are staged in parallel as individual files. Larger files are automatically split into chunks, staged concurrently, and reassembled in the target stage.
A single thread can upload multiple chunks. Increasing the number of threads can improve performance when uploading large files. Supported values: Any integer value from 1 (no parallelism) to 99 (use 99 threads for uploading files).
ELT
-
Before COPY INTO SQL
: this SQL will be executed on the Snowflake Connection before runningCOPY INTO SQL
. -
Ignore errors when executing Before COPY INTO SQL
: if this option is enabled, and there is an error whenBefore COPY INTO SQL
is executed, the error will be ignored. -
Before COPY INTO SQL is a script
: if this option is enabled 'before COPY INTO SQL' will be executed as a Snowflake SQL script. -
After COPY INTO SQL
: this SQL will be executed on the Snowflake Connection after runningCOPY INTO SQL
. -
Ignore errors when executing After COPY INTO SQL
: if this option is enabled, and there is an error whenAfter COPY INTO SQL
is executed, the error will be ignored. -
After COPY INTO SQL is a script
: if this option is enabled 'after COPY INTO SQL' will be executed as a Snowflake SQL script.
Debugging
-
Log each executed SQL statement
: enable this flag if you want to log each automatically generated and executed SQL statement, including before/after COPY INTO SQL.
Other parameters
-
Truncate Columns
: If enabled (disabled by default), theCOPY INTO
command will be generated with aTRUNCATECOLUMNS =true option
. Strings are automatically truncated to the target column length. -
Purge File if Success
: if this option is enabled (by default), and the user-defined SQL is not used, the Etlworks Integrator will run theCOPY INTO
command with the purge option enabled. With purge enabled, the staging file will be automatically deleted after theCOPY INTO
command is successfully executed. -
Purge File(s) if Error
: if this option is enabled (by default), the staging files will be automatically deleted if aCOPY INTO
command was executed with an error. -
Name(s) of the File(s) to Purge
: ifPurge File(s) if Error
is enabled, it is possible to configure which files you want the system to delete if an error occurs while executing theCOPY INTO
command. Wildcard file names are allowed. If not specified, the system will use the same wildcard file name that it used for theCOPY INTO
command. -
On Error
: what to do in case of any error. Available options are:ABORT_STATEMENT
(default),CONTINUE
,SKIP_FILE
.
Parameters specific for different source types
Depending on the Flow type, other Flow parameters can be added, as explained below:
- The source is a database
-
The source is a file or web service
-
if this parameter is enabled, the system will copy files directly into the Snowflake stage before executing the
COPY INTO
command. This greatly improves the performance of the load but automatically disables any transformations. - if this parameter is enabled, the system will automatically delete loaded files from the source.
- if this parameter is enabled, and there is an error during the load, the system will automatically delete loaded files from the source.
-
if this parameter is enabled, the system will copy files directly into the Snowflake stage before executing the
- The source is a well-known API
Step 8. Optionally Configure MERGE (UPSERT)
To MERGE (upsert) existing data in the Snowflake table with new data:
- Set the
Action
toMERGE
. - Define the
Lookup Fields
or the comma-separated list of fields that uniquely identify the record in the Snowflake table.
Alternatively, you can enable the
Predict Lookup Fields
, which, if enabled, will force the Flow to use various algorithms to automatically predict the fields that uniquely identify the record. Note that it is not always possible to correctly detect the unique fields.
When enabling the Predict Lookup Fields
(which is not always accurate) is not an option, you can specify the list of table=fields pairs in the Lookup Field
. Use the fully-qualified table names and ;
as a separator between table=field pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
Step 9. Configure how to handle source and destination schema changes
It is quite typical for the source (for example, the table in the OLTP database) and the destination (Snowflake table) to have a different number of columns. It is also entirely possible that the order of columns in the source is different than the order of columns in the destination. In either of these cases, by default, the Flow will fail since the Snowflake COPY INTO
command cannot load files that have more or fewer columns than the target table or the order of columns is different.
When setting the source-to-destination transformation, it is possible to configure it to automatically handle schema changes. The following configuration options are available:
- Reorder CSV Columns During Extract: if this option is enabled (it is disabled by default), the Flow reorders columns in the data file to match the order of columns in the target Snowflake table. This option is ignored if the target table does not exist.
- Reorder CSV Columns During Load (recommended): if this option is enabled (it is disabled by default), and the
COPY INTO
action is selected, the Flow reorders the column data from a stagedCSV
file before loading it into a table. Enabling this option will allow loading files that have a different set of columns or columns in a different order than the destination table in Snowflake. In most cases, it is recommended to use this option instead ofReorder CSV Columns During Extract
. - Ignore fields in the source that do not exist in the target table: if this option is enabled (it is disabled by default), and the source has more fields than the target (Snowflake) table, the system will automatically exclude extra fields. Options 3, 4, and 5 are mutually exclusive.
- Alter target table if the source has columns that the target table doesn't have: if this option is enabled (it is disabled by default), and the source has different fields than the target (Snowflake) table, the system will add extra fields to the target table. Options 3, 4, and 5 are mutually exclusive.
- Recreate the target table if the source has columns that the target table doesn't have: if this option is enabled (it is disabled by default), and the source has different fields than the target (Snowflake) table, the system will automatically drop and recreate the target table. Options 3, 4, and 5 are mutually exclusive.
-
Insert null into the fields in the target table that do not exist in the source: if this option is enabled (it is disabled by default), and the target (Snowflake) table has different fields than the source, the system will automatically insert
NULL
values into these fields in the target. - Create table SQL: by default system automatically generates an SQL for creating a new table if it does not exist. You can override the default SQL by providing a template or a JavaScript code. Read more about how to override the create table SQL.
Configure notifications if source and destination have different columns
It is possible to send an email notification if the source and destination have different columns. To configure Flow to send a notification when either source has more columns than the destination or the destination has more columns than the source, use the technique explained in this article.
Step 10. Optionally configure Mapping
If necessary, you can create a MAPPING
between the source and destination (Snowflake) fields.
You can create MAPPING
between the source and destination just like you usually do for any other Flow type.
MAPPING
is not required, but please remember that if a source field name is not supported by Snowflake, it will return an error and the data will not be loaded into the database. For example, if you are loading data from Google Analytics, the output (source) is going to include fields with the prefix ga:
( ga:user, ga:browser, etc. ). Unfortunately, Snowflake does not support fields with a :
, so the data will be rejected. If that happens, you can use mapping to rename the destination fields:
Setup incremental Change Replication using high watermark
As in any other Flow type, it is possible to configure a Change Replication using high watermark.
When Change Replication
is enabled, only the changed records will be loaded into Snowflake.
Basically, all you need to do is set the High Watermark Field
and Enable Change Replication
for the transformation. Additionally, for better readability, you can set the calculated High Watermark Field Value
.
Load multiple tables by a wildcard name
To load multiple databases objects (tables and views) by a wildcard name (without creating individual source-to-destination transformations), follow the same steps as above, except:
FROM
Create a single source-to-destination transformation and set the FROM
to a fully qualified source database object name with wildcard characters (* and ?), for example, public.*
By default, all tables and all views that match the wildcard name in the FROM
will be included. If you want to exclude specific database objects, enter a comma-separated list of the database objects to exclude in the Exclude objects
field. If you want to include only specific database objects, enter a comma-separated list of the database objects to include in the Include objects
field.
TO
Set the TO
to the SNOWFLAKE_DW.SCHEMA.*
, for example UTIL_DB.PUBLIC.*
.
Read how to programmatically change the destination (TO) name.
EXCLUDE
Optionally configure the list of the database object to exclude.
To exclude all views, enter all views
in the Exclude Objects
field.
To exclude all tables, enter all tables
in the Exclude Objects
field.
INCLUDE
Optionally, configure the list of the database object to include.
SOURCE QUERY
Optionally, configure the Source query
. You can use the token {TABLE}
in the Source query
.
BEFORE/AFTER SQL
If you want to execute any code before and/or after COPY INTO
, use token {TABLE}
in the Before COPY INTO SQL
and After COPY INTO SQL
.
MERGE
If you are configuring MERGE
action, do not enter anything in the Lookup Field
. Enable Predict Lookup Fields
instead.
An alternative to enabling Predict Lookup Fields
(which is not always accurate), is specifying the list of table=fields pairs in the Lookup Fields
. Use the fully-qualified table names and ;
as a separator between table=field pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
HWM CHANGE REPLICATION
If you are setting up the High Watermark Change Replication
with a calculated HWM field, use the token {TABLE}
in the High Watermark Field Value
.
Common issues when loading data in Snowflake
Read how to troubleshoot and fix common issues when loading data in Snowflake.
Comments
0 comments
Please sign in to leave a comment.