The Etlworks Integrator includes several Flows optimized for extracting data from various sources, then transforming, and loading it into the Snowflake. For example:
If you already have CSV files in the cloud storage (Amazon S3, Google Cloud Storage, Azure Blob) and don't need to transform the data, the most efficient way to load these files into the Snowflake is to use the Flow type
Load files in cloud storage into Snowflake described in this article.
When to use this Flow type
- When you simply want to load CSV files that already exist in the cloud storage into the Snowflake, without applying any transformations or transferring files over the network.
- When you want to load CSV files created by the CDC Flow that saves files to the cloud storage.
How it works
- The Flow reads the names of all files matching the wildcard in the specific bucket or blob. It can traverse the subfolders as well.
- The Flow calculates the destination table names based on the source file names.
- The Flow creates multiple threads, one per destination table but not more than the configurable threshold.
- The Flow generates and executes the
COPY INTOcommand to load files into the temporary table –– one temporary table per actual destination table.
- The Flow mergers data in the temporary tables with data in the actual tables.
The Flow supports the following features:
The Flow supports
COPY INTO (INSERT),
MERGE (UPSERT), and
CDC MERGE. With
MERGE, the Flow updates the actual destination table by applying
DELETE events in the same order as they were originated in the source database and recorded in CSV files. It performs it in the most efficient way possible.
|Monitor source schema changes||The Flow is able to
|Delete loaded source files||The Flow can be configured to delete successfully loaded source files.|
|Load data in parallel||The Flow is able to load data into multiple destination tables in parallel.|
|Process files in subfolders||The Flow is able to process files in multiple nested subfolders.|
|Create all or selected columns as TEXT||The Flow is able to create all or selected columns as TEXT which mitigates issues related to the source schema drift.|
- The Snowflake data warehouse is active.
- The Amazon S3 bucket or Google Storage bucket or Azure blob is created.
- The named Snowflake stage is created in Snowflake. The stage must be pointed to the bucket or blob created in step 2.
- Files to load exist in the bucket or blob created in step 2.
Step 1. Create a new cloud storage Connection
This Connection will be used as a source to access files stored in the cloud storage.
The Connection can be one of the following:
Step 2. Create a new Snowflake Connection
This Connection will be used as a destination.
When creating a Connection, set the
Stage name. The named stage must be configured to read data from the bucket or blob configured for the cloud storage Connection created in step 1.
Step 3. Create a new CSV Format
This Format will be used to generate the valid
COPY INTO command.
It is recommended to set
UTF-8. Keep the default values for other parameters.
Step 4. Create a new Flow
This Flow will be used to load files in cloud storage into Snowflake.
[+], type in
load files in cloud storage
into snowflake, and select the Flow.
Step 5. Configure load transfromation
Select or enter the following attributes of the transformation (left to right):
- Cloud storage Connection created in step 1.
- CSV Format created in step 3.
- A wildcard filename that matches the filenames in the cloud storage. Use
.csvextension for uncompressed files and
- Snowflake Connection created in step 2.
- The wildcard destination table name in the following Format:
SCHEMAis a Snowflake schema to load data into. You can use a fully qualified table name:
Step 6. Set optional and required parameters
Select or enter the following optional and required parameters:
Source Files and Destination Tables
Include files in subfolders: if this option is enabled, the Flow will load all files that match the wildcard filename (set in
FROM) in all subfolders under the main bucket or blob.
Exclude and Exclude files: the optional comma-separated list of files to exclude and/or include. You can use wildcard file names. These options used together with the wildcard filename set in
- The Flow populates the list of files matching a wildcard filename in
- The Flow excludes files based on the list of exclusions set in
- The Flow includes files based on the list of inclusion set in
- The Flow populates the list of files matching a wildcard filename in
nameand the actual table name must be assigned to the variable
value. For example, let's assume that the source file name is
value = name.substring(name.indexOf('-') + 1, name.indexOf('_cdc_stream'));.
Maximum Number of Files to Process: the optional parameter that controls how many files will be loaded in one batch (one run of the Flow). If you expect that at any point in time there will be hundreds of thousands or millions (10^6 - 10^7) of files in the source folder(s), it is recommended to limit the number of files to process to more manageable: hundreds to thousands (10^3-10^5).
Maximum Number of Parallel Loads: if the value of this optional parameter is greater than 1, the Flow will create multiple threads to load data in parallel –– one thread per table but not more than the threshold set in this parameter.
Log each executed SQL statement: enable this flag if you want to log each automatically generated and executed SQL statement, including before/after Load SQL.
Continue loading data into other tables if Error: if this parameter is enabled (it is disabled by default) and there is an error when loading data into some table the flow will continue loading data into other tables. If configured the notification will be sent to the webhook.
COPY INTO parameters and Action
Stage name: the name of the Snowflake stage. Stage refers to the location where your data files are stored for loading into Snowflake. This field is optional and overrides the Stage name set at the Snowflake Connection level.
Action: can be
COPY INTOwhich inserts records from the file(s) into the Snowflake table,
MERGEwhich merges records from the file with the records in the Snowflake table, and
CDC MERGEwhich loads CDC events (
DELETE) in the same order as they were originated in the source database.
CDC MERGErequire configuring the
Lookup Fieldsor/and enabling the .
Lookup Fields: the comma-separated list of fields that uniquely identify the record in the target Snowflake table.
- If this option is enabled and
Lookup Fieldsis empty, the system will try to predict the columns that uniquely identify the record in the target Snowflake table.
Error on Column Mismatch: this option specifies whether to generate a parsing error if the number of delimited columns (i.e. fields) in an input data file, does not match the number of columns in the corresponding table. If disabled, an error is not generated and the load continues.
- If the file is successfully loaded: if the input file contains records with more fields than columns in the table, the matching fields are loaded in order of occurrence in the file and the remaining fields are not loaded.
- If the input file contains records with fewer fields than columns in the table: the non-matching columns in the table are loaded with
Trim Space: this option specifies whether to remove white space from fields.
Replace Invalid characters: if enabled, Snowflake replaces invalid
UTF-8characters with the Unicode replacement character.
String used to convert to and from SQL NULL: when loading data, Snowflake replaces these strings in the data load source with
SQL NULL. To specify more than one string, enclose the list of strings in parentheses and use commas to separate each value. For example,
NULL_IF = ('N', 'NULL', 'NUL', '').
Binary format: this parameter defines the encoding Format for binary input or output. This option only applies when loading data into binary columns in a table. Default is
Truncate Columns: if enabled (disabled by default), the
COPY INTOcommand will be generated with a
TRUNCATECOLUMNS =true option. Strings are automatically truncated to the target column length.
Purge File if Success: if this option is enabled (by default), the staging files will be automatically deleted after the
COPY INTOcommand is successfully executed.
On Error: what to do in case of any error. Available options are:
Before COPY INTO SQL: this SQL will be executed on the Snowflake Connection before running
COPY INTO SQL.
Ignore errors when executing Before COPY INTO SQL: if this option is enabled, and there is an error when
Before COPY INTO SQLis executed, the error will be ignored.
Before Load SQL is a script: if this option is enabled,
Before Load SQLwill be executed as a Snowflake SQL script.
After COPY INTO SQL: this SQL will be executed on the Snowflake Connection after running
COPY INTO SQL.
Ignore errors when executing After COPY INTO SQL: if this option is enabled and there is an error when
After COPY INTO SQLis executed, the error will be ignored.
After Load SQL is a script: if this option is enabled,
After Load SQLwill be executed as a Snowflake SQL script.
Handling source schema changes
Alter target table if the source has columns that the target table doesn't have: if this option is enabled (it is disabled by default), and the source has different fields than the target (Snowflake) table, the system will add extra fields to the target table.
Create all actual tables with only TEXT columns: if this option is enabled (it is disabled by default) the flow will create all actual tables with only TEXT columns. Enable this option to make the load more resilient to the source schema drift. By default, the flow creates the actual table using the first file as a pattern. It then alters the actual table by adding new columns but it never modifies the data types of the existing columns which could cause issues if there are multiple source files to load into the same destination table and some of these files have columns with different data types (compare to each other).
Actual tables with all TEXT columns: a comma-separated list of regular expressions matching the table names for which the flow should create all columns as
Here are the rules for regexes:
- Regexes are case insensitive.
- If regex includes comma (
,) it must be escaped (
- Java regex syntax must be used.
- If the destination table name is configured using fully qualified (DB.SCHEMA.TABLE) or partially qualified (SCHEMA.TABLE) notation the regex should reflect it. Example (include tables
TEXT columns: a comma-separated list of regular expressions matching the column names which should be created as
Here are the rules for 3 parameters above:
Create all actual tables with only TEXT columnsis enabled all columns in all new tables and new columns in the existing tables will be created as
- If the table name matches the regex in
Actual tables with all TEXT columnsand
TEXT columnsis empty all columns in that table will be created as
TEXT columnsis not empty and the column name in any table matches any regex configured in this field the column will be created as
Create temp tables with only TEXT columns: if this option is enabled (it is disabled by default) the flow will create temporary tables with only TEXT columns. Enable this option to make the load more resilient to the source schema drift. By default, the flow creates the temp table using the first file as a pattern. It then alters the temp table by adding new columns but it never modifies the data types of the existing columns which could cause issues if there are multiple source files to load into the same destination table and some of these files have columns with different data types (compare to each other). This option is automatically enabled if
Create all actual tables with only TEXT columnsis enabled or if the table name matches the regex configured in
Actual tables with all TEXT columns.
Step 7. Optionally, add a mapping.
The mapping can be used to:
- Rename the column.
- Exclude the column.
The use case for mapping
Let's assume that the destination database is MySQL and it has a table with a column named
CURRENT_DATE. Let's assume that the CSV file was created by CDC flow and is stored in the S3 bucket. The load flow loads CSV files from S3 into the Snowflake. When trying to create a table in Snowflake with a column named
CURRENT_DATE the flow fails with the following error:
SnowflakeSQLException: SQL compilation error: error line 3 at position 0 .invalid column definition name 'CURRENT_DATE' (ANSI reserved)
The simplest solution to this problem is to rename the column
CURRENT_DATE to (for example)
CURRENT_DATE2. It can be accomplished using mapping.
Select transformation and click
Add mapping for column
The renaming or excluding the column in mapping will work for all tables that have this column. If the table does not have the column the mapping will be ignored.
Step 8. Optionally, add more source-to-destination transformations
[+] and repeat steps 5 to 6 for more locations and/or file name templates.