When to use this Flow type
This Flow bulk-loads data into Snowflake using the user-defined COPY INTO command. The data can be files in file-based or cloud storage, responses from APIs, email attachments, or objects stored in a NoSQL database.
Flows optimized for Snowflake
Flow type | When to use | |
|
|
When you need to extract data from any source, transform it and load it into Snowflake. |
Bulk load files into Snowflake | When you need to bulk-load files that already exist in the external Snowflake stage (S3, Azure Blob, GC blob) or in the server storage without applying any transformations. The flow automatically generates the COPY INTO command and MERGEs data into the destination. | |
Stream CDC events into Snowflake | When you need to stream updates from the database which supports Change Data Capture (CDC) into Snowflake in real time. | |
Stream messages from a queue into Snowflake | When you need to stream messages from the message queue which supports streaming into Snowflake in real time. | |
COPY files into Snowflake | You are here | When you need to bulk-load data from the file-based or cloud storage, API, or NoSQL database into Snowflake without applying any transformations. This flow requires providing the user-defined COPY INTO command. Unlike Bulk load files into Snowflake, this flow does not support automatic MERGE. |
How it works
This flow copies files from the source to the Snowflake internal or external stage and then executes the user-defined COPY INTO command.
Prerequisites
1. The Snowflake data warehouse is active.
2. The Stage name
is set for the Snowflake connection or Transformation (the latter overrides the stage set for the Connection). Etlworks uses the SnowflakeCOPY INTO
command to load data into Snowflake tables.COPY INTO
requires a named internal or external stage. Stage refers to the location where your data files are stored for loading into Snowflake. Read how Etlworks flow automatically creates the named Snowflake stage.
3. For loading data from the external stage in AWS S3, Azure Blob, or Google Cloud Storage, the Amazon S3 bucket, Google Storage bucket, or Azure blob needs to be created. Note that Etlworks flow does not create the bucket or blob.
Step-by-step instruction
Step 1. Create a source Connection.
Step 2. Create and test the Snowflake Connection.
Read how to create a Snowflake Connection.
When creating a Connection, set theStage name
. For loading files in cloud storage, the named external stage must be configured to read data from the bucket or blob configured for the cloud storage Connection created in step 1.
Step 3. Create destination connection
Depending on how you prefer to stage files for loading data into the Snowflake create one of the following connections:
- The Server Storage connection for loading data from the internal Snowflake stage.
- S3 connection for loading data from the S3 external stage.
- Azure storage connection for loading data from the Azure external stage.
- Google Cloud Storage connection for loading data from the Google Cloud external stage.
It is recommended to select GZip
as the value for the Archive file before copying
field when creating a connection for cloud storage.
Step 4. Create new flow
In Flows click Add flow button and enter Snowlfake
in Select Flow Type
. Select COPY files into Snowflake
.
Step 5. Set the source and the destination
For the source (FROM
), select the Connection created in Step 1 and enter a file name or a wildcard file name, for example, *.csv
.
For the destination (TO
), select the Connection created in Step 3 and enter the base destination table name (without the database name and schema name).
Step 6. Set the Snowflake connection
Select Connections
tab and select the Snowflake Connection created in Step 2.
Step 7. Configure COPY INTO SQL
Click MAPPING
, select Parameters
.
Enter the valid SQL in Copy INTO SQL
. Read about COPY INTO.
If the source file is CSV
COPY INTO TABLE_NAME FROM @stage_name PATTERN = 'destination_name.*'
FILE_FORMAT = (type = 'CSV', FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '"'
SKIP_HEADER = 1 NULL_IF = ('NULL','null',''))
PURGE = true FORCE = true TRUNCATECOLUMNS = false ON_ERROR = ABORT_STATEMENT
Where
- TABLE_NAME - the name of the destination Snowflake table. The table must be manually created.
- stage_name - the Snowflake stage name (same as in the Snowflake connection).
- destination_name - the name set in TO.
If the source file is JSON
COPY INTO TABLE_NAME FROM @stage_name PATTERN = 'destination_name.*'
FILE_FORMAT = (type = 'JSON', strip_outer_array = true)
PURGE = true FORCE = true ON_ERROR = ABORT_STATEMENT
Where
- TABLE_NAME - the name of the destination Snowflake table. The table must be manually created using the following syntax:
create table table_name (data_column_name variant)
. - stage_name - the Snowflake stage name (same as in the Snowflake connection).
- destination_name - the name set in TO.
Step 8. Define the other optional parameters:
Action
: ifMove
is selected (default), the file(s) will be copied to the S3 bucket and removed from the original source. IfCopy
is selected, the file(s) will be copied to the S3 bucket and retain in the original source.Add Suffix to the Destination File Name
: you can select one of the predefined suffixes for the files created using this file operation. For example, if you selectuuid
as a suffix, and the original filename isdest.csv
, Etlworks will create files with the namedest_uuid.csv
, whereuuid
is a globally unique identifier such as21EC2020-3AEA-4069-A2DD-08002B30309D
. The default value for this field isuuid
.Do not process files that have been already processed
: if this option is enabled, the system will skip files that have already been loaded into Snowflake.
Snowflake also keeps track of which files are loaded, but when this option is enabled, the files will not even be copied to the S3, saving a lot of money on traffic.
Maximum Simultaneous Operations
: Eltworks can copy or move each file in its own thread. Use this property to set the maximum number of simultaneous file operations.Purge File(s) if Error
: if this option is enabled (by default), the staging files will be automatically deleted if theCOPY INTO
command was executed with an error.Name(s) of the File(s) to Purge
: ifPurge File(s) if Error
is enabled, this makes it possible to configure which files you want the system to delete if an error occurs while executing theCOPY INTO
command. Wildcard file names are allowed. If not specified, the system will use the same wildcard file name as it used for theCOPY INTO
command.Before COPY INTO SQL
: this SQL will be executed on the Snowflake ConnectionBefore COPY INTO SQL
.Ignore errors when executing Before COPY INTO SQL
: if this option is enabled, and there is an error whenBefore COPY INTO SQL
is executed, the error will be ignored.After COPY INTO SQL
: this SQL will be executed on the Snowflake ConnectionAfter COPY INTO SQL
.Ignore errors when executing After COPY INTO SQL
: if this option is enabled, and there is an error whenAfter COPY INTO SQL
is executed, the error will be ignored.
Step 9. Save and run the Flow.
Use wildcard filenames in Snowflake COPY INTO command
Pattern matching
.*string.*
: matches all files which contain the given string. Eg: abc_string, abc1_string23, string_abc.
.*
: matches one or more occurrences of any character, including no character.
?
: represents or matches a single occurrence of any character.
Bracketed characters [ ]
: matches any occurrence of character enclosed in the square brackets. It is possible to use different types of characters (alphanumeric): numbers, letters, other special characters, etc.
Read more about pattern matching in COPY INTO command.
Comments
0 comments
Please sign in to leave a comment.