When to use this Flow type
If you already have files in a local or cloud storage (Amazon S3, Google Cloud Storage, Azure Blob) and don't need to transform the data, the most efficient way to load these files into the Snowflake is to use the Flow type Bulk load files into Snowflake
described in this article.
Use this flow type
- When you simply want to load flat (CSV) or semi-structured (JSON, Avro, Parquet) files that already exist in the local or cloud storage into the Snowflake, without applying any transformations or transferring files over the network.
- When you want to load CSV or JSON files created by the CDC Flow.
Flows optimized for Snowflake
Flow type | When to use | |
|
|
When you need to extract data from any source, transform it and load it into Snowflake. |
Bulk load files into Snowflake | You are here | When you need to bulk-load files that already exist in the external Snowflake stage (S3, Azure Blob, GC blob) or in the server storage without applying any transformations. The flow automatically generates the COPY INTO command and MERGEs data into the destination. |
Stream CDC events into Snowflake | When you need to stream updates from the database which supports Change Data Capture (CDC) into Snowflake in real time. | |
Stream messages from a queue into Snowflake | When you need to stream messages from the message queue which supports streaming into Snowflake in real time. | |
COPY files into Snowflake | When you need to bulk-load data from the file-based or cloud storage, API, or NoSQL database into Snowflake without applying any transformations. This flow requires providing the user-defined COPY INTO command. Unlike Bulk load files into Snowflake, this flow does not support automatic MERGE. |
How it works
- The Flow automatically creates a named Snowflake stage if it does not exists.
- The Flow reads the names of all files matching the wildcard in the specific location. It can traverse the subfolders as well.
- The Flow calculates the destination table names based on the source file names.
- The Flow creates multiple threads, one per destination table but not more than the configurable threshold.
- The Flow generates and executes the
COPY INTO
command to load files into the temporary table – one temporary table per actual destination table. - The Flow mergers data in the temporary tables with data in the actual tables.
Alternatively, the Flow can load files directly into the actual destination table by generating and executing the COPY INTO
command for files matching the wildcard.
Features
The Flow supports COPY INTO
(INSERT), MERGE
(UPSERT), and CDC MERGE
. With CDC
MERGE
, the Flow updates the actual destination table by applying INSERT
/UPDATE
/DELETE
events in the same order as they were originated in the source database and recorded in CSV files. It performs it in the most efficient way possible.
Other Features | Description |
Monitor source schema changes | The Flow is able to CREATE and ALTER destination tables to keep up with schema changes in the source. |
Load semi-structured files into the VARIANT column | The Flow can load semistructured files such as JSON, Avro, and Parquet into the single VARIANT column in the Snowflake table. |
Load files directly into the destination table | By default, the Flow uses temporary table to load files into the Snowflake. It can be also configured to load files directly into the destination table. In this mode, the Flow does not support MERGE and CDC MERGE. |
Delete loaded source files | The Flow can be configured to delete successfully loaded source files. |
Load data in parallel | The Flow is able to load data into multiple destination tables in parallel. |
Process files in subfolders | The Flow is able to process files in multiple nested subfolders. |
Create all or selected columns as TEXT | The Flow is able to create all or selected columns as TEXT which mitigates issues related to the source schema drift. |
Prerequisites
1. The Snowflake data warehouse is active.
2. The Stage name
is set for the Snowflake connection or Transformation (the latter overrides the stage set for the Connection). Etlworks uses the SnowflakeCOPY INTO
command to load data into Snowflake tables.COPY INTO
requires a named internal or external stage. Stage refers to the location where your data files are stored for loading into Snowflake. Read how Etlworks flow automatically creates the named Snowflake stage.
3. For loading data from the external stage in AWS S3, Azure Blob, or Google Cloud Storage, the Amazon S3 bucket, Google Storage bucket, or Azure blob needs to be created. Note that Etlworks flow does not create the bucket or blob.
Process
Step 1. Create a new cloud storage or server storage Connection
This Connection will be used as a source to access files stored in the cloud storage or server (local) storage.
The Connection can be one of the following:
- Server Storage connection for loading data from the internal Snowflake stage.
- S3 connection for loading data from the S3 external stage.
- Azure storage connection for loading data from the Azure external stage.
- Google Cloud Storage connection for loading data from the Google Cloud external stage.
Step 2. Create a new Snowflake Connection
Snowflake connection will be used as a destination.
When creating a Connection, set the Stage name
. For loading files in cloud storage, the named external stage must be configured to read data from the bucket or blob configured for the cloud storage Connection created in step 1.
Step 3. Create a new format
This Format will be used to generate the valid COPY INTO
command.
The following formats are supported:
- CSV - CSV is the most commonly used format for loading data into Snowflake. With CSV
COPY INTO
,MERGE
, andCDC MERGE
are supported. The files can be compressed usinggzip
. - JSON - the flow can load JSON files into a single VARIANT column. Only
COPY INTO
is supported. The files can be compressed usinggzip
. - Parquet - the flow can load Parquet files into a single VARIANT column. Only
COPY INTO
is supported. Parquet files can be encoded using one of the following codecs so no extra compression is needed: SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD. - Avro - the flow can load Avro files into a single VARIANT column. Only
COPY INTO
is supported. Avro files can be encoded using one of the following codecs so no extra compression is needed: DEFLATE, SNAPPY, BZIP2, XZ.
Step 4. Create a new Flow
This Flow will be used to bulk load files into Snowflake.
In Flows
click [+]
, type in bulk load files into snowflake
, and select the Flow.
Step 5. Configure load transfromation
Select or enter the following attributes of the transformation (left to right):
- Storage Connection created in step 1.
- Format created in step 3.
- A wildcard filename that matches the filenames in the cloud storage. Use
.gz
extension for compressed files. Examples for loading CSV files created by CDC flow:*_cdc_stream_*.csv
for uncompressed files or*_cdc_stream_*.gz
for compressed files. - Snowflake Connection created in step 2.
- The wildcard destination table name in the following format:
SCHEMA.*
, whereSCHEMA
is a Snowflake schema to load data into. You can use a fully qualified table name:DATABASE.SCHEMA.*
. Example:public.*
. You don't need to use wildcards when loading into a specific table.
Step 6. Set optional and required parameters
Click MAPPING
.
Select the Parameters
tab.
Select or enter the following optional and required parameters:
Source Files and Destination Tables
Include files in subfolders
: if this option is enabled, the Flow will load all files that match the wildcard filename (set inFROM
) in all subfolders under the main bucket or blob.Exclude and Exclude files
: the optional comma-separated list of files to exclude and/or include. You can use wildcard file names. These options are used together with the wildcard filename set inFROM
:- The Flow populates the list of files matching a wildcard filename in
FROM
. - The Flow excludes files based on the list of exclusions set in
Exclude files
. - The Flow includes files based on the list of inclusion set in
Include files
.
- The Flow populates the list of files matching a wildcard filename in
Calculate Snowflake table name
: this is an optional parameter used to calculate (using JavaScript) the destination (Snowflake) table name based on the source file name. The original file name, without the extension, is stored in the variablename
and the actual table name must be assigned to the variablevalue
. For example, let's assume that the source file name isdb_schema-table_cdc_stream_uuid.csv
. The JavaScript code to calculate the destination table name will bevalue = name.substring(name.indexOf('-') + 1, name.indexOf('_cdc_stream'));
. IMPORTANT: the flow will automatically recognize and transform filenames matching the following template:*_cdc_stream_*.*
. If filenames are matching this template there is no need to provide a transformation script.Maximum Number of Files to Process
: the optional parameter that controls how many files will be loaded in one batch (one run of the Flow). If you expect that at any point in time there will be hundreds of thousands or millions (10^6 - 10^7) of files in the source folder(s), it is recommended to limit the number of files to process to more manageable: hundreds to thousands (10^3-10^5).Maximum Number of Parallel Loads
: if the value of this optional parameter is greater than 1, the Flow will create multiple threads to load data in parallel –– one thread per table but not more than the threshold set in this parameter.Parallel
: enable this flag if you have more than one source-to-destination transformation that you want to execute in parallel.Create new Snowflake connection for each table
: if this parameter is enabled the new Snowflake connection will be created to load data into each destination table. The connections will be closed when the flow will finish loading data into all tables.Purge File if Success
: if this option is enabled (by default), the source files will be automatically deleted after theCOPY INTO
command is successfully executed.
Debug
Log each executed SQL statement
: enable this flag if you want to log each automatically generated and executed SQL statement, including before/after Load SQL.
Error recovery
Continue loading data into other tables if Error
: if this parameter is enabled (it is disabled by default) and there is an error when loading data into some table the flow will continue loading data into other tables. If configured the notification will be sent to the webhook.
Load parameters
Stage name
: the name of the Snowflake stage. Stage refers to the location where your data files are stored for loading into Snowflake. This field is optional and overrides the Stage name set at the Snowflake Connection level.Direct load
: if this option is enabled (it is disabled by default) the flow loads data into the Snowflake table directly without an intermediate temporary table. This option is automatically activated for JSON, Parquet, and Avro files. EnablingDirect load
is the fastest way to load data into the Snowflake but it does not support MERGE and CDC MERGE.Use COPY INTO with a wildcard pattern
: If this option is enabled (it is disabled by default) the flow loads all files into a specific destination table in one call using COPY INTO with a wildcard pattern. The advantage of this approach is the speed of load. The disadvantage is an inability to handle files with a different number of columns.Action
: can beCOPY INTO
which inserts records from the file(s) into the Snowflake table,MERGE
which merges records from the file with the records in the Snowflake table, andCDC MERGE
which loads CDC events (INSERT
/UPDATE
/DELETE
) in the same order as they originated in the source database.MERGE
andCDC MERGE
require configuring theLookup Fields
or/and enabling the .Handle explicit CDC updates
: The typical stream produced by CDC flow includes all columns in the monitored table. In some cases, CDC flow captures changes in the explicitly changed columns only, for example, when the supplemental logging for the Oracle table is configured as 'LOG DATA (PRIMARY KEY) COLUMNS' or 'LOG DATA (UNIQUE) COLUMNS'. Enable this option so the CDC MERGE statement can preserve the original values of the columns NOT captured by CDC flow.Lookup Fields
: the comma-separated list of fields that uniquely identify the record in the target Snowflake table.- If this option is enabled and
Lookup Fields
is empty, the system will try to predict the columns that uniquely identify the record in the target Snowflake table.
COPY INTO parameters
Truncate Columns
: if enabled (disabled by default), theCOPY INTO
command will be generated with aTRUNCATECOLUMNS =true option
. Strings are automatically truncated to the target column length.On Error
: what to do in case of any error. Available options are:ABORT_STATEMENT
(default),CONTINUE
,SKIP_FILE
.
Change wildcard pattern for COPY INTO
By default, the flow sets the wildcard pattern to the longest common string for the files that will be loaded into a specific table. When Use COPY INTO with a wildcard pattern
is enabled it is possible to change the wildcard pattern using JavaScript.
Available variables:
etlConfig
: the config.table
: the table name.commonFileName
: the default wildcard pattern.files
: the list of files.
Example
value = table + '_cdc_stream_';
All formats
Replace Invalid characters
: if enabled, Snowflake replaces invalidUTF-8
characters with the Unicode replacement character.String used to convert to and from SQL NULL
: when loading data, Snowflake replaces these strings in the data load source withSQL NULL
. To specify more than one string, enclose the list of strings in parentheses and use commas to separate each value. For example,NULL_IF = ('N', 'NULL', 'NUL', '')
.Binary format
: this parameter defines the encoding Format for binary input or output. This option only applies when loading data into binary columns in a table. Default isHEX
.Trim Space
: this option specifies whether to remove white space from fields.
CSV format
Error on Column Mismatch
: this option specifies whether to generate a parsing error if the number of delimited columns (i.e. fields) in an input data file, does not match the number of columns in the corresponding table. If disabled, an error is not generated and the load continues.- If the file is successfully loaded: if the input file contains records with more fields than columns in the table, the matching fields are loaded in order of occurrence in the file and the remaining fields are not loaded.
- If the input file contains records with fewer fields than columns in the table: the non-matching columns in the table are loaded with
NULL
values.
Empty field as null
: Boolean that specifies whether to insert SQL NULL for empty fields in an input file, which are represented by two successive delimiters.
JSON format
Strip outer array
: boolean that instructs the JSON parser to remove outer brackets [ ].Enable octal
: boolean that enables parsing of octal numbers.Allow duplicate
: boolean that allows duplicate object field names (only the last one will be preserved).Strip null values
: boolean that instructs the JSON parser to remove object fields or array elements containing null values.
Parquet format
Binary as text
: boolean that specifies whether to interpret columns with no defined logical data type as UTF-8 text. When set to FALSE, Snowflake interprets these columns as binary data.
Semi-structured formats (JSON, Parquet, Avro)
VARIANT column name
: the name of the VARIANT column when creating a new Snowflake table.
ELT
Set variables for Before and After SQL
: code in JavaScript to set the variables that can be referenced as {varname} in the Before and/or After SQL. To set the variable add it as a key-value pair to the script variable 'vars'.- Example:
vars.put('schema', 'abc');
- Example of the SQL:
insert into test (schema_name) values ('{schema}')
- Example:
Before COPY INTO SQL
: this SQL will be executed on the Snowflake Connection before runningCOPY INTO SQL
.Ignore errors when executing Before COPY INTO SQL
: if this option is enabled, and there is an error whenBefore COPY INTO SQL
is executed, the error will be ignored.Before Load SQL is a script
: if this option is enabled,Before Load SQL
will be executed as a Snowflake SQL script.After COPY INTO SQL
: this SQL will be executed on the Snowflake Connection after runningCOPY INTO SQL
.Ignore errors when executing After COPY INTO SQL
: if this option is enabled and there is an error whenAfter COPY INTO SQL
is executed, the error will be ignored.After Load SQL is a script
: if this option is enabled,After Load SQL
will be executed as a Snowflake SQL script.
Handling source schema changes
Alter target table if the source has columns that the target table doesn't have
: if this option is enabled (it is disabled by default), and the source has different fields than the target (Snowflake) table, the system will add extra fields to the target table.Create all actual tables with only TEXT columns
: if this option is enabled (it is disabled by default) the flow will create all actual tables with only TEXT columns. Enable this option to make the load more resilient to the source schema drift. By default, the flow creates the actual table using the first file as a pattern. It then alters the actual table by adding new columns but it never modifies the data types of the existing columns which could cause issues if there are multiple source files to load into the same destination table and some of these files have columns with different data types (compare to each other).Actual tables with all TEXT columns
: a comma-separated list of regular expressions matching the table names for which the flow should create all columns asTEXT
.
Here are the rules for regexes:
-
- Regexes are case insensitive.
- If regex includes comma (
,
) it must be escaped (/,
). - Java regex syntax must be used.
- If the destination table name is configured using fully qualified (DB.SCHEMA.TABLE) or partially qualified (SCHEMA.TABLE) notation the regex should reflect it. Example (include tables
country
andcity
):text.*
->.*country,.*city
.
TEXT columns
: a comma-separated list of regular expressions matching the column names which should be created asTEXT
.
Here are the rules for 3 parameters above:
-
-
- If
Create all actual tables with only TEXT columns
is enabled all columns in all new tables and new columns in the existing tables will be created asTEXT
. - If the table name matches the regex in
Actual tables with all TEXT columns
andTEXT columns
is empty all columns in that table will be created asTEXT
. - If
TEXT columns
is not empty and the column name in any table matches any regex configured in this field the column will be created asTEXT
.
- If
-
Create temp tables with only TEXT columns
: if this option is enabled (it is disabled by default) the flow will create temporary tables with only TEXT columns. Enable this option to make the load more resilient to the source schema drift. By default, the flow creates the temp table using the first file as a pattern. It then alters the temp table by adding new columns but it never modifies the data types of the existing columns which could cause issues if there are multiple source files to load into the same destination table and some of these files have columns with different data types (compare to each other). This option is automatically enabled ifCreate all actual tables with only TEXT columns
is enabled or if the table name matches the regex configured inActual tables with all TEXT columns
.
Step 7. Optionally, add a mapping.
The mapping can be used to:
- Rename the column.
- Exclude the column.
- Change the column data type for tables automatically created by the flow.
The mapping is only supported when loading CSV files.
Configure mapping
Select transformation and click MAPPING
.
Add mapping for column CURRENT_DATE
.
The renaming or excluding the column in mapping will work for all tables that have this column. If the table does not have the column the mapping will be ignored.
Step 8. Optionally, add more source-to-destination transformations
Click [+]
and repeat steps 5 to 6 for more locations and/or file name templates.
Comments
0 comments
Please sign in to leave a comment.