Overview
Snowflake is a column-based relational database. Therefore, you can use the same techniques you would normally use to work with relational databases in Etlworks. However, inserting data into Snowflake row by row can be extremely slow.
To improve performance, we recommend using a Snowflake-optimized ETL flow when loading data into Snowflake.
Flows optimized for Snowflake
Flow type | When to use | |
|
You are here |
When you need to extract data from any source, transform it and load it into Snowflake. |
Bulk load files into Snowflake | When you need to bulk-load files that already exist in the external Snowflake stage (S3, Azure Blob, GC blob) or in the server storage without applying any transformations. The flow automatically generates the COPY INTO command and MERGEs data into the destination. | |
Stream CDC events into Snowflake | When you need to stream updates from the database which supports Change Data Capture (CDC) into Snowflake in real time. | |
Stream messages from a queue into Snowflake | When you need to stream messages from the message queue which supports streaming into Snowflake in real time. | |
COPY files into Snowflake | When you need to bulk-load data from the file-based or cloud storage, API, or NoSQL database into Snowflake without applying any transformations. This flow requires providing the user-defined COPY INTO command. Unlike Bulk load files into Snowflake, this flow does not support automatic MERGE. |
How it works
With Snowflake-optimized flows, you can extract data from any supported source, transform it, and load it into Snowflake efficiently.
A typical Snowflake flow includes the following steps:
1. Automatically creates a named Snowflake stage if it doesn’t exist.
2. Extracts data from the source.
3. Creates CSV, JSON, Avro, or Parquet files.
4. Compresses the files using the gzip algorithm.
5. Copies the files into a Snowflake stage (local file system, Azure Blob, Amazon S3, or Google Cloud storage).
6. Checks if the destination Snowflake table exists; if not, creates it using metadata from the source.
7. Dynamically generates and runs the Snowflake COPY INTO command.
8. If configured, merges the data from the source with the existing data in Snowflake.
9. Cleans up files, if necessary.
Prerequisites
1. The Snowflake data warehouse must be active.
2. A stage name must be specified either in the Snowflake connection or the transformation settings (with transformation settings overriding the connection). Etlworks uses the Snowflake COPY INTO command to load data into Snowflake tables, which requires a named internal or external stage. A stage is where your data files are stored before being loaded into Snowflake. Read more about how Etlworks automatically creates the named Snowflake stage.
3. If loading data from an external stage (such as AWS S3, Azure Blob, or Google Cloud Storage), the respective bucket or blob must already exist. Note that the Etlworks flow does not create these storage locations for you.
Step-by-step instruction
Here’s how to extract, transform, and load data into Snowflake:
Step 1. Create all required Connections
You will need three connections:
1. A source connection.
2. A connection for the staging area (Amazon S3, Azure Blob, Google Cloud Storage, or Server Storage).
When configuring a connection for Amazon S3, Azure Storage, or Google Storage, which will serve as the stage, it’s recommended to set GZip as the value for the “Archive file before copying” field.
3. A Snowflake Connection.
When creating a Snowflake Connection, set the Stage name
. Alternatively, you can configure the Stage name
at the transformation level. Read how Etlworks flow automatically creates the named Snowflake stage.
Step 2. Create a data exchange Format
Snowflake can load data from CSV, JSON, or Avro formats. You’ll need to create one of these formats and set it as the destination format.
If you’re using CSV for large datasets, it’s recommended to configure the format to split the document into smaller files. Snowflake can load files in parallel, and smaller files can be transferred over the network more quickly, improving overall performance.
Step 3. Create a Flow to load data in Snowflake
To create a Snowflake flow, follow these steps:
1. Open the Flows window, click the + button, and type “Snowflake” in the search field.
2. Select the appropriate flow type (e.g., Database to Snowflake).
Step 4. Set Snowflake Connection
In all Snowflake flows, the final destination is Snowflake.
To configure the destination:
1. Go to the Connections tab.
2. Select the Snowflake connection you created in Step 1.
Step 5. Set the source and the destination
Depending on the flow type, you can choose one of the following sources (FROM) for the Snowflake flow:
• Database: Use the table or view name as the source. Wildcard names (e.g., public.*) are supported.
• Well-known API: Use the endpoint or file name as the source.
• Web Service: Use the endpoint name as the source.
• File: Use the file name or a wildcard pattern as the source.
• Queue: Use the queue or topic name as the source.
For all Snowflake flows, the destination connection will be either Amazon S3 Connection, Azure Storage Connection, Google Cloud Storage, or server storage. Etlworks uses this destination connection as the Snowflake stage.
Next, select or enter the fully qualified Snowflake table name as the destination (TO). If you use a wildcard template for the source (FROM), set the destination (TO) using a wildcard as well.
Step 6. Set Data Exchange Format
Select the data exchange format that you configured in Step 2. This format will be used to process the data as it moves from the source to Snowflake.
Step 7. Set optional parameters
To set optional parameters, click Configure in the transformation row, then go to the Parameters tab.
COPY INTO parameters
• Snowflake Table Name: Specify the name of the Snowflake table to load data into. This field is optional and overrides the table name set at the transformation level.
• Stage Name: Specify the Snowflake stage where data files are stored. This field is optional and overrides the stage name set at the connection level. The stage name is part of the COPY INTO command. Example: COPY INTO table FROM @{STAGING_AREA_NAME} PATTERN = 'file' FILE_FORMAT = (FORMAT_NAME = CSVFORMAT) PURGE = true.
• Action: Choose either COPY INTO to insert records from the files into the Snowflake table, or MERGE to combine records from the files with existing records in Snowflake. MERGE requires configuring lookup fields.
• How to MERGE: Select the type of SQL to use for merging data. Options:
- Snowflake MERGE (default): Use native Snowflake MERGE SQL.
- DELETE/INSERT: Deletes matching records in the target table and inserts all records from the temp table.
• Lookup Fields: Enter a comma-separated list of fields that uniquely identify records in the target Snowflake table. This is required if the action is set to MERGE.
• Predict Lookup Fields: Enable this option to let the system automatically detect the columns that uniquely identify records, if lookup fields are not provided.
• Use INSERT/DELETE instead of MERGE for CDC MERGE action: Enable this (disabled by default) to generate and execute three SQL statements for CDC MERGE:
1. Delete matching rows from the main table.
2. Insert updated and new records from the temp table.
3. Delete rows marked for deletion in the temp table.
• Load SQL: Enter user-defined COPY INTO SQL. By default, Etlworks generates this automatically based on the parameters. Example: COPY INTO {TABLE} FROM @STAGING PATTERN = '{FILE}' FILE_FORMAT = (FORMAT_NAME = {FORMAT}) PURGE = {PURGE}.
• MERGE SQL: Enter user-defined SQL for MERGE. If not provided, the default MERGE SQL will be used. Parameters such as {TABLE}, {TEMP_TABLE}, {KEY_FIELDS}, and {FIELDS} are automatically populated and can be referenced in the SQL.
• CDC MERGE SQL: Define custom SQL for CDC MERGE. If not provided, the default CDC MERGE SQL is used. Parameters such as {TABLE}, {TEMP_TABLE}, {MERGE_CONDITION}, {KEY_FIELDS}, and {FIELDS} are automatically populated.
• Force Loading Files: Enable this to reload files regardless of whether they’ve changed by using FORCE=true in the COPY INTO command.
Format
• File Format: You can optionally provide a format name or definition. For example, an ad-hoc format could look like this: (type = 'CSV' field_delimiter = ',' skip_header = 1). If no format is provided, the system will automatically create one based on the destination format configured for the transformation. Learn more about Snowflake file formats.
• Replace Invalid Characters: If enabled, Snowflake will replace invalid UTF-8 characters with the Unicode replacement character.
• Error on Column Mismatch: This option determines whether a parsing error is triggered when the number of columns in the input data file doesn’t match the number of columns in the target table. If disabled, the load continues even if there’s a mismatch. For example:
• If the file has more fields than the table, the extra fields are ignored.
• If the file has fewer fields, the remaining columns in the table are filled with NULL values.
• Trim Space: Enable this option to remove any leading or trailing whitespace from fields.
• String Used for SQL NULL: When loading data, Snowflake can replace specific strings in the source data with SQL NULL. To specify multiple strings, enclose them in parentheses and separate with commas. For example: NULL_IF = ('N', 'NULL', 'NUL', '').
Parameters for copying files into the local stage
• Number of Threads for Uploading Files: This parameter defines how many threads are used to upload files to the local stage. The upload process splits data files based on size:
• Small files (less than 16 MB, compressed or uncompressed) are staged in parallel as individual files.
• Larger files are automatically split into chunks, uploaded concurrently, and reassembled in the target stage.
A single thread can handle multiple chunks. Increasing the number of threads can significantly improve performance when uploading large files. You can specify any value between 1 (no parallelism) and 99 (maximum parallelism using 99 threads).
ELT
• Before COPY INTO SQL: This SQL command will be executed on the Snowflake connection before the COPY INTO operation.
• Ignore Errors for Before COPY INTO SQL: If enabled, any errors that occur during the execution of the “Before COPY INTO SQL” will be ignored, allowing the process to continue.
• Before COPY INTO SQL is a Script: If enabled, the “Before COPY INTO SQL” will be executed as a full Snowflake SQL script, not just a single command.
• After COPY INTO SQL: This SQL command will be executed on the Snowflake connection after the COPY INTO operation.
• Ignore Errors for After COPY INTO SQL: If enabled, any errors that occur during the execution of the “After COPY INTO SQL” will be ignored.
• After COPY INTO SQL is a Script: If enabled, the “After COPY INTO SQL” will be executed as a Snowflake SQL script.
Debugging
• Log Each Executed SQL Statement: Enable this option to log every automatically generated and executed SQL statement, including those run before and after the COPY INTO operation.
Other parameters
• Truncate Columns: If enabled (disabled by default), the COPY INTO command will include TRUNCATECOLUMNS = true, which automatically truncates strings to fit the target column length.
• Purge File if Success: When enabled (default), and if user-defined SQL is not used, the COPY INTO command will run with the purge option, deleting the staging file after the COPY INTO operation is successfully completed.
• Purge File(s) if Error: When enabled (default), the staging files will be automatically deleted if a COPY INTO command fails.
• Name(s) of the File(s) to Purge: If “Purge File(s) if Error” is enabled, you can specify which files should be deleted in case of an error. Wildcard file names are supported. If not specified, the system will use the same wildcard file name from the COPY INTO command.
• On Error: Defines what to do in case of an error during execution. Available options are:
• ABORT_STATEMENT (default): Stops execution on error.
• CONTINUE: Continues execution, ignoring the error.
• SKIP_FILE: Skips the current file and continues with the next one.
Parameters specific for different source types
Depending on the flow type, additional parameters may need to be configured, as explained below:
• The source is a file or web service
- Ignore Transformations: When enabled, the system will copy files directly to the Snowflake stage without applying any transformations. This can significantly improve load performance, but it disables any transformation steps.
- Delete Loaded Source Files: If enabled, the system will automatically delete files from the source after they’ve been successfully loaded.
- Delete Source Files on Error: When enabled, if there’s an error during the load, the system will automatically delete the files from the source.
• The source is a well-known API
Step 8. Optionally Configure MERGE (UPSERT)
To merge (upsert) new data with existing data in the Snowflake table:
1. Set the Action to MERGE.
2. Define the Lookup Fields—a comma-separated list of fields that uniquely identify each record in the Snowflake table.
Alternatively, you can enable Predict Lookup Fields, which will use algorithms to automatically identify the unique fields. However, keep in mind that automatic detection may not always be accurate.
If enabling Predict Lookup Fields is not a suitable option, you can manually specify the list of table=fields pairs in the Lookup Fields setting. Use fully qualified table names and a semicolon (;) as a separator between table=field pairs.
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
Step 9. Configure how to handle source and destination schema changes
It’s common for the source (e.g., an OLTP database table) and the destination (Snowflake table) to have a different number of columns, or for the order of columns to differ. By default, the flow will fail if the number of columns or their order doesn’t match, as Snowflake’s COPY INTO command cannot load files where the columns differ from the target table.
When setting up source-to-destination transformations, you can configure the flow to automatically handle schema changes. The following options are available:
• Reorder CSV Columns During Extract: If enabled (disabled by default), this option reorders the columns in the extracted data file to match the order in the target Snowflake table. This option is ignored if the target table doesn’t exist.
• Reorder CSV Columns During Load (recommended): If enabled (disabled by default) and the COPY INTO action is selected, this option reorders the column data from a staged CSV file before loading it into Snowflake. This allows the flow to load files that have different columns or column orders than the target table. It’s recommended to use this option over “Reorder CSV Columns During Extract.”
• Ignore Fields in the Source That Do Not Exist in the Target Table: If enabled (disabled by default), the system will ignore extra fields if the source has more columns than the target Snowflake table. Options 3, 4, and 5 are mutually exclusive.
• Alter Target Table If the Source Has Extra Columns: If enabled (disabled by default), the system will automatically add extra columns to the target Snowflake table if they exist in the source but not in the target. Options 3, 4, and 5 are mutually exclusive.
• Recreate the Target Table If the Source Has Extra Columns: If enabled (disabled by default), the system will drop and recreate the target Snowflake table if the source has columns that the target doesn’t have. Options 3, 4, and 5 are mutually exclusive.
• Insert NULL Into Target Fields Not Present in the Source: If enabled (disabled by default), the system will insert NULL values into fields in the target table that are not present in the source data.
• Create Table SQL: By default, the system automatically generates SQL to create a new table if one does not exist. You can override this by providing a custom SQL template or JavaScript code. Read more about overriding the create table SQL.
Configure Notifications for Schema Mismatches
You can configure the flow to send an email notification if there is a mismatch between the source and destination columns. If the source has more or fewer columns than the destination, the system will log an exception. This exception can be retrieved and sent as part of an email notification, as explained in this article.
Step 10. Optionally configure Mapping
If needed, you can create a MAPPING between the source and destination (Snowflake) fields.
The mapping process works the same way as it does for other flow types. While mapping is not mandatory, keep in mind that Snowflake has certain restrictions on field names. If a source field contains unsupported characters, Snowflake will return an error, and the data will not be loaded.
Setup incremental Change Replication using high watermark
Like with any other flow type, you can configure Change Replication using high watermark.
When Change Replication is enabled, only the records that have changed since the last run will be loaded into Snowflake.
To set this up:
1. Set the High Watermark Field: Choose the field that will be used to track changes.
2. Enable Change Replication for the transformation.
3. Optionally, you can configure the Calculated High Watermark Field Value for better readability and tracking.
Load multiple tables by a wildcard name
To load multiple database objects (tables and views) by a wildcard name without creating individual source-to-destination transformations, follow these steps:
FROM
1. Create a single source-to-destination transformation.
2. Set the FROM field to a fully qualified source database object name with wildcard characters (* or ?), for example, public.*.
By default, all tables and views that match the wildcard pattern in the FROM field will be included.
• To exclude specific objects, enter a comma-separated list in the Exclude Objects field.
• To include only certain objects, use the Include Objects field.
TO
Set the TO field to a wildcard name that matches the Snowflake schema, for example, SNOWFLAKE_DW.SCHEMA.* (e.g., UTIL_DB.PUBLIC.*).
Read more about programmatically changing the destination (TO) name.
EXCLUDE
Optionally, configure the list of database objects to exclude:
• To exclude all views, enter all views in the Exclude Objects field.
• To exclude all tables, enter all tables in the Exclude Objects field.
INCLUDE
Optionally, specify the list of database objects to include using the Include Objects field.
SOURCE QUERY
You can optionally configure a Source Query and use the {TABLE} token to dynamically reference the tables.
BEFORE/AFTER SQL
If you need to execute SQL before or after the COPY INTO operation, use the {TABLE} token in the Before COPY INTO SQL and After COPY INTO SQL fields.
MERGE
If you are using the MERGE action:
• Do not specify anything in the Lookup Field.
• Enable Predict Lookup Fields for automatic field detection (though this may not always be accurate).
Alternatively, specify the list of table=field pairs in the Lookup Fields. Use fully qualified table names and separate pairs with semicolons (;).
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
High Watermark Change Replication (HWM)
If setting up High Watermark Change Replication with a calculated HWM field, use the {TABLE} token in the High Watermark Field Value.
Common issues when loading data in Snowflake
Read how to troubleshoot and fix common issues when loading data in Snowflake.
Comments
0 comments
Please sign in to leave a comment.