The Etlworks Integrator includes several Flows optimized for extracting data from various sources, then transforming, and loading it into the Snowflake. For example:
If you already have files in a local or cloud storage (Amazon S3, Google Cloud Storage, Azure Blob) and don't need to transform the data, the most efficient way to load these files into the Snowflake is to use the Flow type
Bulk load files into Snowflake described in this article.
When to use this Flow type
- When you simply want to load flat (CSV) or semi-structured (JSON, Avro, Parquet) files that already exist in the local or cloud storage into the Snowflake, without applying any transformations or transferring files over the network.
- When you want to load CSV or JSON files created by the CDC Flow.
How it works
- The Flow reads the names of all files matching the wildcard in the specific location. It can traverse the subfolders as well.
- The Flow calculates the destination table names based on the source file names.
- The Flow creates multiple threads, one per destination table but not more than the configurable threshold.
- The Flow generates and executes the
COPY INTOcommand to load files into the temporary table – one temporary table per actual destination table.
- The Flow mergers data in the temporary tables with data in the actual tables.
Alternatively, the Flow can load files directly into the actual destination table by generating and executing the
COPY INTO command for files matching the wildcard.
The Flow supports
COPY INTO (INSERT),
MERGE (UPSERT), and
CDC MERGE. With
MERGE, the Flow updates the actual destination table by applying
DELETE events in the same order as they were originated in the source database and recorded in CSV files. It performs it in the most efficient way possible.
|Monitor source schema changes||The Flow is able to
|Load semi-structured files into the VARIANT column||The Flow can load semistructured files such as JSON, Avro, and Parquet into the single VARIANT column in the Snowflake table.|
|Load files directly into the destination table||By default, the Flow uses temporary table to load files into the Snowflake. It can be also configured to load files directly into the destination table. In this mode, the Flow does not support MERGE and CDC MERGE.|
|Delete loaded source files||The Flow can be configured to delete successfully loaded source files.|
|Load data in parallel||The Flow is able to load data into multiple destination tables in parallel.|
|Process files in subfolders||The Flow is able to process files in multiple nested subfolders.|
|Create all or selected columns as TEXT||The Flow is able to create all or selected columns as TEXT which mitigates issues related to the source schema drift.|
- The Snowflake data warehouse is active.
- For loading files in cloud storage, the Amazon S3 bucket or Google Storage bucket, or Azure blob is created.
- The named Snowflake stage is created in Snowflake. For loading files in cloud storage, the external stage must be pointed to the bucket or blob created in step 2.
Step 1. Create a new cloud storage or server storage Connection
This Connection will be used as a source to access files stored in the cloud storage or server (local) storage.
The Connection can be one of the following:
Step 2. Create a new Snowflake Connection
This Connection will be used as a destination.
When creating a Connection, set the
Stage name. For loading files in cloud storage, the named external stage must be configured to read data from the bucket or blob configured for the cloud storage Connection created in step 1.
Step 3. Create a new format
This Format will be used to generate the valid
COPY INTO command.
The following formats are supported:
- CSV - CSV is the most commonly used format for loading data into Snowflake. With CSV
CDC MERGEare supported. The files can be compressed using
- JSON - the flow can load JSON files into a single VARIANT column. Only
COPY INTOis supported. The files can be compressed using
- Parquet - the flow can load Parquet files into a single VARIANT column. Only
COPY INTOis supported. Parquet files can be encoded using one of the following codecs so no extra compression is needed: SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD.
- Avro - the flow can load Avro files into a single VARIANT column. Only
COPY INTOis supported. Avro files can be encoded using one of the following codecs so no extra compression is needed: DEFLATE, SNAPPY, BZIP2, XZ.
Step 4. Create a new Flow
This Flow will be used to bulk load files into Snowflake.
[+], type in
bulk load files into snowflake, and select the Flow.
Step 5. Configure load transfromation
Select or enter the following attributes of the transformation (left to right):
- Storage Connection created in step 1.
- Format created in step 3.
- A wildcard filename that matches the filenames in the cloud storage. Use
.gzextension for compressed files.
- Snowflake Connection created in step 2.
- The wildcard destination table name in the following format:
SCHEMAis a Snowflake schema to load data into. You can use a fully qualified table name:
DATABASE.SCHEMA.*. You don't need to use wildcards when loading into a specific table.
Step 6. Set optional and required parameters
Select or enter the following optional and required parameters:
Source Files and Destination Tables
Include files in subfolders: if this option is enabled, the Flow will load all files that match the wildcard filename (set in
FROM) in all subfolders under the main bucket or blob.
Exclude and Exclude files: the optional comma-separated list of files to exclude and/or include. You can use wildcard file names. These options are used together with the wildcard filename set in
- The Flow populates the list of files matching a wildcard filename in
- The Flow excludes files based on the list of exclusions set in
- The Flow includes files based on the list of inclusion set in
- The Flow populates the list of files matching a wildcard filename in
nameand the actual table name must be assigned to the variable
value. For example, let's assume that the source file name is
value = name.substring(name.indexOf('-') + 1, name.indexOf('_cdc_stream'));. IMPORTANT: the flow will automatically recognize and transform filenames matching the following template:
*_cdc_stream_*.*. If filenames are matching this template there is no need to provide a transformation script.
Maximum Number of Files to Process: the optional parameter that controls how many files will be loaded in one batch (one run of the Flow). If you expect that at any point in time there will be hundreds of thousands or millions (10^6 - 10^7) of files in the source folder(s), it is recommended to limit the number of files to process to more manageable: hundreds to thousands (10^3-10^5).
Maximum Number of Parallel Loads: if the value of this optional parameter is greater than 1, the Flow will create multiple threads to load data in parallel –– one thread per table but not more than the threshold set in this parameter.
Parallel: enable this flag if you have more than one source-to-destination transformation that you want to execute in parallel.
Create new Snowflake connection for each table: if this parameter is enabled the new Snowflake connection will be created to load data into each destination table. The connections will be closed when the flow will finish loading data into all tables.
Purge File if Success: if this option is enabled (by default), the source files will be automatically deleted after the
COPY INTOcommand is successfully executed.
Log each executed SQL statement: enable this flag if you want to log each automatically generated and executed SQL statement, including before/after Load SQL.
Continue loading data into other tables if Error: if this parameter is enabled (it is disabled by default) and there is an error when loading data into some table the flow will continue loading data into other tables. If configured the notification will be sent to the webhook.
Stage name: the name of the Snowflake stage. Stage refers to the location where your data files are stored for loading into Snowflake. This field is optional and overrides the Stage name set at the Snowflake Connection level.
Direct load: if this option is enabled (it is disabled by default) the flow loads data into the Snowflake table directly without an intermediate temporary table. This option is automatically activated for JSON, Parquet, and Avro files. Enabling
Direct loadis the fastest way to load data into the Snowflake but it does not support MERGE and CDC MERGE.
Use COPY INTO with a wildcard pattern: If this option is enabled (it is disabled by default) the flow loads all files into a specific destination table in one call using COPY INTO with a wildcard pattern. The advantage of this approach is the speed of load. The disadvantage is an inability to handle files with a different number of columns.
Action: can be
COPY INTOwhich inserts records from the file(s) into the Snowflake table,
MERGEwhich merges records from the file with the records in the Snowflake table, and
CDC MERGEwhich loads CDC events (
DELETE) in the same order as they originated in the source database.
CDC MERGErequire configuring the
Lookup Fieldsor/and enabling the .
Handle explicit CDC updates: The typical stream produced by CDC flow includes all columns in the monitored table. In some cases, CDC flow captures changes in the explicitly changed columns only, for example, when the supplemental logging for the Oracle table is configured as 'LOG DATA (PRIMARY KEY) COLUMNS' or 'LOG DATA (UNIQUE) COLUMNS'. Enable this option so the CDC MERGE statement can preserve the original values of the columns NOT captured by CDC flow.
Lookup Fields: the comma-separated list of fields that uniquely identify the record in the target Snowflake table.
- If this option is enabled and
Lookup Fieldsis empty, the system will try to predict the columns that uniquely identify the record in the target Snowflake table.
COPY INTO parameters
Truncate Columns: if enabled (disabled by default), the
COPY INTOcommand will be generated with a
TRUNCATECOLUMNS =true option. Strings are automatically truncated to the target column length.
On Error: what to do in case of any error. Available options are:
Replace Invalid characters: if enabled, Snowflake replaces invalid
UTF-8characters with the Unicode replacement character.
String used to convert to and from SQL NULL: when loading data, Snowflake replaces these strings in the data load source with
SQL NULL. To specify more than one string, enclose the list of strings in parentheses and use commas to separate each value. For example,
NULL_IF = ('N', 'NULL', 'NUL', '').
Binary format: this parameter defines the encoding Format for binary input or output. This option only applies when loading data into binary columns in a table. Default is
Trim Space: this option specifies whether to remove white space from fields.
Error on Column Mismatch: this option specifies whether to generate a parsing error if the number of delimited columns (i.e. fields) in an input data file, does not match the number of columns in the corresponding table. If disabled, an error is not generated and the load continues.
- If the file is successfully loaded: if the input file contains records with more fields than columns in the table, the matching fields are loaded in order of occurrence in the file and the remaining fields are not loaded.
- If the input file contains records with fewer fields than columns in the table: the non-matching columns in the table are loaded with
Empty field as null: Boolean that specifies whether to insert SQL NULL for empty fields in an input file, which are represented by two successive delimiters.
Strip outer array: boolean that instructs the JSON parser to remove outer brackets [ ].
Enable octal: boolean that enables parsing of octal numbers.
Allow duplicate: boolean that allows duplicate object field names (only the last one will be preserved).
Strip null values: boolean that instructs the JSON parser to remove object fields or array elements containing null values.
Binary as text: boolean that specifies whether to interpret columns with no defined logical data type as UTF-8 text. When set to FALSE, Snowflake interprets these columns as binary data.
Semi-structured formats (JSON, Parquet, Avro)
VARIANT column name: the name of the VARIANT column when creating a new Snowflake table.
Before COPY INTO SQL: this SQL will be executed on the Snowflake Connection before running
COPY INTO SQL.
Ignore errors when executing Before COPY INTO SQL: if this option is enabled, and there is an error when
Before COPY INTO SQLis executed, the error will be ignored.
Before Load SQL is a script: if this option is enabled,
Before Load SQLwill be executed as a Snowflake SQL script.
After COPY INTO SQL: this SQL will be executed on the Snowflake Connection after running
COPY INTO SQL.
Ignore errors when executing After COPY INTO SQL: if this option is enabled and there is an error when
After COPY INTO SQLis executed, the error will be ignored.
After Load SQL is a script: if this option is enabled,
After Load SQLwill be executed as a Snowflake SQL script.
Handling source schema changes
Alter target table if the source has columns that the target table doesn't have: if this option is enabled (it is disabled by default), and the source has different fields than the target (Snowflake) table, the system will add extra fields to the target table.
Create all actual tables with only TEXT columns: if this option is enabled (it is disabled by default) the flow will create all actual tables with only TEXT columns. Enable this option to make the load more resilient to the source schema drift. By default, the flow creates the actual table using the first file as a pattern. It then alters the actual table by adding new columns but it never modifies the data types of the existing columns which could cause issues if there are multiple source files to load into the same destination table and some of these files have columns with different data types (compare to each other).
Actual tables with all TEXT columns: a comma-separated list of regular expressions matching the table names for which the flow should create all columns as
Here are the rules for regexes:
- Regexes are case insensitive.
- If regex includes comma (
,) it must be escaped (
- Java regex syntax must be used.
- If the destination table name is configured using fully qualified (DB.SCHEMA.TABLE) or partially qualified (SCHEMA.TABLE) notation the regex should reflect it. Example (include tables
TEXT columns: a comma-separated list of regular expressions matching the column names which should be created as
Here are the rules for 3 parameters above:
Create all actual tables with only TEXT columnsis enabled all columns in all new tables and new columns in the existing tables will be created as
- If the table name matches the regex in
Actual tables with all TEXT columnsand
TEXT columnsis empty all columns in that table will be created as
TEXT columnsis not empty and the column name in any table matches any regex configured in this field the column will be created as
Create temp tables with only TEXT columns: if this option is enabled (it is disabled by default) the flow will create temporary tables with only TEXT columns. Enable this option to make the load more resilient to the source schema drift. By default, the flow creates the temp table using the first file as a pattern. It then alters the temp table by adding new columns but it never modifies the data types of the existing columns which could cause issues if there are multiple source files to load into the same destination table and some of these files have columns with different data types (compare to each other). This option is automatically enabled if
Create all actual tables with only TEXT columnsis enabled or if the table name matches the regex configured in
Actual tables with all TEXT columns.
Step 7. Optionally, add a mapping.
The mapping can be used to:
- Rename the column.
- Exclude the column.
The mapping is only supported when loading CSV files.
The use case for mapping
Let's assume that the source database is MySQL and it has a table with a column named
CURRENT_DATE. Let's assume that the CSV file was created by CDC flow and is stored in the S3 bucket. The load flow loads CSV files from S3 into the Snowflake. When trying to create a table in Snowflake with a column named
CURRENT_DATE the flow fails with the following error:
SnowflakeSQLException: SQL compilation error: error line 3 at position 0 .invalid column definition name 'CURRENT_DATE' (ANSI reserved)
The simplest solution to this problem is to rename the column
CURRENT_DATE to (for example)
CURRENT_DATE2. It can be accomplished using mapping.
Select transformation and click
Add mapping for column
The renaming or excluding the column in mapping will work for all tables that have this column. If the table does not have the column the mapping will be ignored.
Step 8. Optionally, add more source-to-destination transformations
[+] and repeat steps 5 to 6 for more locations and/or file name templates.