Overview
Redshift is a column-based relational database. Therefore, you can use the same techniques you would normally use to work with relational databases in Etlworks. It is, however, important to understand that inserting data into Redshift row by row can be painfully slow.
It is recommended that you use Redshift-optimized Flow to ETL data in Redshift.
Flows optimized for Redshift
Flow type | When to use | |
|
You are here |
When you need to extract data from any source, transform it and load it into Redshift. |
Bulk load files in S3 into Redshift | When you need to bulk-load files that already exist in S3 without applying any transformations. The flow automatically generates the COPY command and MERGEs data into the destination. | |
Stream CDC events into Redshift | When you need to stream updates from the database which supports Change Data Capture (CDC) into Redshift in real-time. | |
Stream messages from queue into Redshift | When you need to stream messages from the message queue which supports streaming into Redshift in real-time. | |
COPY files into Redshift | When you need to bulk-load data from the file-based or cloud storage, API, or NoSQL database into Redshift without applying any transformations. This flow requires providing the user-defined COPY command. UnlikeBulk load files in S3 into Redshift, this flow does not support automatic MERGE. |
How it works
Using Redshift-optimized Flows, you can extract data from any supported sources and load it directly into Redshift.
A typical Redshift Flow performs the following operations:
- Extracts data from the source.
- Creates CSV files.
- Compresses files using the gzip algorithm.
- Copies files into Amazon S3 bucket.
- Checks to see if the destination Redshift table exists, and if it does not — creates the table using metadata from the source.
- Dynamically generates and executes the Redshift
COPY
command. - Cleans up the remaining files, if needed.
Prerequisites
- The Redshift is up and running and available from the Internet.
- The Redshift user has
INSERT
privilege for the table(s). - The Amazon S3 bucket is created, and Redshift is able to access the bucket.
Step-by-step instruction
Here's how you can extract, transform, and load data in Amazon Redshift:
Step 1. Create all required Connections
You will need a source Connection, an Amazon S3 connection used as a stage for the file to load, and a Redshift Connection.
When configuring a Connection for Amazon S3, which will be used as a stage for the Redshift Flows, it is recommended that you enable GZip
archiving.
Step 2. Create a data exchange Format
Redshift can load data from CSV, JSON, Avro, and other data exchange Formats, but Etlworks only supports loading from CSV, so you will need to create a CSV Format.
When configuring the CSV Format, it is recommended to set the Value for null
field to \N
, so the Redshift COPY
command can differentiate between an empty string and NULL
value.
If you are using a CSV Format for loading large datasets into the Redshift, consider configuring a Format to split the document into smaller files: Redshift can load files in parallel, also transferring smaller files over the network can be faster.
Step 3. Create a Flow to load data in Redshift
Start creating Redshift Flows by opening the Flows window, clicking +
, and typing redshift
into the Search
field:
Continue by selecting the Flow type, adding source-to-destination transformations, and entering the transformation parameters:
Step 4. Set Redshift Connection
For all Redshift Flows, the final destination is redshift
.
To configure the final destination, click the Connections
tab and select the available Redshift Connection.
Step 5. Set the source and the destination
Depending upon the Flow type, you can select one of the following sources (FROM
) for the Redshift Flow:
- Database: use the database table or view name as the source (
FROM
) name. The wildcard names, for example,public.*
are supported. - Well-known API: use the endpoint name or the file name as the source (
FROM
) name. - Web Service: use the endpoint name as the source (
FROM
) name. - File: use the source file name or a wildcard file name as the source (
FROM
) name. - Queue: use the queue name as the source (
FROM
) name.
For all Redshift Flows, the destination Connection is going to be Amazon S3 Connection. Etlworks uses the destination Connection as a Redshift stage.
Select or enter the fully-qualified Redshift table name as a destination (TO
). If the source (FROM
) is entered using a wildcard template, set the destination (TO
) to a wildcard as well.
Step 6. Set data exchange Format configured in step 2
Step 7. Set the optional parameters
Click MAPPING
in the transformation
row and select the Parameters
tab.
Optional parameters and transformations
Redshift Table Name
: the name of the Redshift table to load data into. This field is optional and overrides the Redshift table name set at the transformation level.Column List
: you can specify a comma-separated list of column names to load source data fields into specific target columns. The columns can be in any order in theCOPY
statement, but when loading from flat files, such as in an Amazon S3 bucket, their order must match the order of the source data.Truncate Columns
: if this option is enabled, the string literals will be truncated during the load to the maximum allowed size for the designated column.String used to convert top SQL NULL
: Redshift loads fields that match null_string as NULL, where null_string can be any string. If your data includes a null terminator, also referred to as NUL (UTF-8 0000) or binary zero (0x000), COPY treats it as any other character. For example, a record containing '1' || NUL || '2' is copied as string of length 3 bytes. If a field contains only NUL, you can use NULL AS to replace the null terminator with NULL by specifying '\0' or '\000' — for example, NULL AS '\0' or NULL AS '\000'. If a field contains a string that ends with NUL and NULL AS is specified, the string is inserted with NUL at the end. Do not use '\n' (newline) for the null_string value. Amazon Redshift reserves '\n' for use as a line delimiter. The default null_string is '\N'.Load empty chars as null
: Indicates that Amazon Redshift should load emptyCHAR
andVARCHAR
fields asNULL
. Empty fields for other data types, such as INT, are always loaded withNULL
. Empty fields occur when data contains two delimiters in succession with no characters between the delimiters.Load blank chars as null
: Loads blank fields, which consist of only white space characters, asNULL
. This option applies only toCHAR
andVARCHAR
columns. Blank fields for other data types, such as INT, are always loaded withNULL
. For example, a string that contains three space characters in succession (and no other characters) is loaded as aNULL
. The default behavior, without this option, is to load the space characters as is.Accept any date
: Allows any date Format, including invalid Formats such as 00/00/00 00:00:00, to be loaded without generating an error. This parameter applies only toTIMESTAMP
andDATE
columns.Accept invalid UTF8 chars
: Enables loading of data intoVARCHAR
columns even if the data contains invalid UTF-8 characters. WhenACCEPTINVCHARS
is specified,COPY
replaces each invalid UTF-8 character with a string of equal length consisting of the character specified by replacement_char. The default replacement_char is a question mark (?
).Escape
: When this parameter is specified, the backslash character in input data is treated as an escape character. The character that immediately follows the backslash character is loaded into the table as part of the current column value, even if it is a character that normally serves a special purpose. For example, you can use this parameter to escape the delimiter character, a quotation mark, an embedded newline character, or the escape character itself when any of these characters is a legitimate part of a column value.Remove quotes
: Removes surrounding quotation marks from strings in the incoming data. All characters within the quotation marks, including delimiters, are retained.Explicit IDs
: UseEXPLICIT_IDS
with tables that haveINDENTITY
columns if you want to override the autogenerated values with explicit values from the source data files for the tables. If the command includes a column list, that list must include theINDENTITY
columns to use this parameter. The data format forEXPLICIT_IDS
values must match theINDENTITY
Format specified by theCREATE TABLE
definition.File Format
: the CSV Format definition. Read about Redshift file Formats. The Format definition is optional. If not provided, the system will automatically create an ad-hoc definition based on the destination Format configured for the transformation.IAM ROLE
: the optional AWS IAM role used to access the S3 bucket. If the role is not provided, the Flow will use theAccess Key
andSecret
configured for the S3 Connection used in this transformation.Max # of Errors to Ignore
: The default value is0
and the limit is 100000. Use this parameter to allow loads to continue when certain rows fail to load into the table because of formatting errors or other inconsistencies in the data. Set this value to0
or1
if you want the load to fail as soon as the first error occurs.Refresh of Optimizer Statistics
: Governs automatic computation and refresh of optimizer statistics at the end of a successfulCOPY
command. By default, if theSTATUPDATE
parameter is not used, statistics are updated automatically if the table is initially empty. WithSTATUPDATE ON
, statistics are updated automatically regardless of whether the table is initially empty. WithSTATUPDATE OFF
, statistics are never updated.No Load
: Checks the validity of the data file without actually loading the data. Use theNOLOAD
parameter to ensure that your data file will load without any errors before running the actual data load. RunningCOPY
with theNOLOAD
parameter is much faster than loading the data because it only parses the files.Action
: can be eitherCOPY INTO
- inserts records from the file(s) into the Redshift table orMERGE
- merges records from the file with the records in the Redshift table.MERGE
requires configuringLookup Fields
.How to MERGE
: the type of SQL to execute when merging data. The following options are available:Native MERGE
(preview) - execute native Redshift MERGE SQL. Note that MERGE command is in preview;DELETE/INSERT
(default) - DELETE all records in the actual table that also exist in the temp table, then INSERT all records from the temp table into the actual table.Lookup Fields
: the comma-separated list of fields that uniquely identify the record in the Redshift table. This field is required if action is set toMERGE
.Copy SQL
: this is a user-definedCopy SQL
. By default, Etlworks createsCopy SQL
automatically, based on the input and output parameters. You can override it by using this field. Read more about the RedshiftCOPY
command.MERGE SQL
: this is a user-defined SQL that will be used instead of the default when action is set toMERGE
. If nothing is entered in this field, the defaultMERGE SQL
will be used. The following parameters are automatically populated and can be referenced as{TOKEN}
in the SQL:{TABLE}
: the table toMERGE
data into.{TEMP_TABLE}
: the table toMERGE
data from.{KEY FIELDS}
: the fields uniquely identifying the record in both tables.{FIELDS}
: the fields toINSERT
/UPDATE
in the table toMERGE
data into.
CDC MERGE SQ
: this is a user-defined SQL that will be used instead of the default when action is set toCDC MERGE
. If nothing is entered in this field, the defaultCDC MERGE SQL
will be used. The following parameters are automatically populated and can be referenced as{TOKEN}
in the SQL:{TABLE}
: the table to MERGE data into,{TEMP_TABLE}
: the table to merge data from,{KEY_FIELDS}
: the fields uniquely identifying the record in both tables,{FIELDS}
: the fields to INSERT,{UPDATE_FIELDS}
: the fields and values to update in the Formatfield=value,field=value
,{UPDATE_CONDITIONS}
- theWHERE
clause to update existing records.
Purge File if Succes
: if this option is enabled (by default) and the user-defined SQL is not used, Etlworks will run theCOPY INTO
command with the purge option enabled. With purge enabled, the staging file will be automatically deleted after theCOPY INTO
command is successfully executed.Purge File(s) if Error
: if this option is enabled (by default), the staging files will be automatically deleted if aCOPY INTO
command was executed with an error.Name(s) of the File(s) to Purge
: ifPurge File(s) if Error
is enabled, it is possible to configure which files you want the system to delete if an error occurs while executing theCOPY
command. Wildcard file names are allowed. If not specified, the system will use the same wildcard file name as it used for theCOPY
command.
ELT
Before COPY SQL
: this SQL will be executed on the Redshift Connection before runningCOPY SQL
. You can configure flow to ignore errors when executing this SQL and to execute SQL as a script.Ignore errors when executing Before COPY SQL
: if this option is enabled, and there is an error whenBefore COPY SQL
is executed - the error will be ignored.Before COPY SQL is a script
: if this option is enabled 'before COPY SQL' will be executed as a Redshift script.After COPY SQL
: this SQL will be executed on the Redshift Connection after runningCOPY SQL
. You can configure flow to ignore errors when executing this SQL and to execute SQL as a script.Ignore errors when executing After COPY SQL
: if this option is enabled, and there is an error whenAfter COPY SQL
is executed, the error will be ignored.After COPY SQL is a script
: if this option is enabled 'after COPY SQL' will be executed as a Redshift script.
Debugging
Log each executed SQL statement
: enable this flag if you want to log each automatically generated and executed SQL statement, including before/after COPY SQL.
Parameters specific for different source types
Depending on the Flow type, other Flow parameters can be added, as explained below:
- The source is a database
- The source is a file or web service
- if this parameter is enabled, the system will copy files directly into the Redshift stage before executing the
COPY
command. This greatly improves the performance of the load but automatically disables any transformations. - if this parameter is enabled, the system will automatically delete loaded files from the source.
- if this parameter is enabled and there is an error during the load, the system will automatically delete loaded files from the source.
- if this parameter is enabled, the system will copy files directly into the Redshift stage before executing the
- The source is a well-known API
Step 8. Optionally configure MERGE (UPSERT)
To merge (upsert) existing data in the Redshift table with new data:
- Set the
Action
toMERGE
. - Define the
Lookup Fields
— the comma-separated list of fields that uniquely identify the record in the Redshift table.
Alternatively, you can enable the Predict Lookup Fields
which, if enabled, will force the Flow to use various algorithms to predict the fields that uniquely identify the record automatically. Note that it is not always possible to detect the unique fields correctly.
When enabling the Predict Lookup Fields
(which is not always accurate) is not an option, you can specify the list of table=fields
pairs in the Lookup Field
. Use the fully-qualified table names and ;
as a separator between table=fields
pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
Step 9. Configure how to handle source and destination schema changes
It is quite typical when the source (for example, the table in the OLTP database) and the destination (Redshift table) have a different number of columns. It is also entirely possible that the order of columns in the source is different than the order of columns in the destination. In either of these cases, by default, the Flow will fail since the Redshift COPY
command cannot load files that have more or fewer columns than the target table, or the order of columns is different.
When setting the source-to-destination transformation, it is possible to configure it to handle schema changes automatically. The following configuration options are available:
Reorder CSV Columns During Extract
: if this option is enabled (it is disabled by default), the Flow reorders columns in the data file to match the order of columns in the target Snowflake table. This option is ignored if the target table does not exist.Ignore fields in the source that do not exist in the target table
: if this option is enabled (it is disabled by default), and the source has more fields than the target (Redshift) table, the system will automatically exclude extra fields. Options 2, 3, and 4 are mutually exclusive.Alter target table if the source has columns that the target table doesn't have
: if this option is enabled (it is disabled by default), and the source has different fields than the target (Redshift) table, the system will add extra fields to the target table. Options 2, 3, and 4 are mutually exclusive.Recreate target table if the source has columns that the target table doesn't have
: if this option is enabled (it is disabled by default), and the source has different fields than the target (Redshift) table, the system will automatically drop and recreate the target table. Options 2, 3, and 4 are mutually exclusive.Insert null into the fields in the target table that do not exist in the source
: if this option is enabled (it is disabled by default), and the target (Redshift) table has different fields than the source, the system will automatically insertNULL
values into these fields in the target.Create table SQL
: by default system automatically generates an SQL for creating a new table if it does not exist. You can override the default SQL by providing a template or a JavaScript code. Read more about how to override the create table SQL.
Configure notifications if source and destination have different columns
It is possible to send an email notification if the source and destination have different columns. To configure Flow to send a notification when either source has more columns than the destination or the destination has more columns than the source, use the technique explained in this article.
Step 10. Optionally configure Mapping
If necessary, you can create a Mapping
between the source and destination (Redshift) fields.
You create Mapping
between the source and destination just like you usually do for any other Flow type.
Mapping
is not required, but please remember that if a source field name is not supported by Redshift, it will return an error, and the data will not be loaded into the database. For example, if you are loading data from Google Analytics, the output (source) is going to include fields with the prefix ga:
( ga:user, ga:browser, etc. ). Unfortunately, Redshift does not support fields with a :
, so the data will be rejected. If that happens, you can use Mapping
to rename the destination fields:
Setup Change Replication using high watermark
As in any other Flow type, it is possible to configure a change replication using high watermark.
When change replication is enabled, only the changed records will be loaded into Redshift.
Basically, all you need to do is set the High Watermark Field
and Enable Change Replication
for the transformation. Additionally, for better reliability, you can set the calculated High Watermark Field Value
.
Load multiple tables by a wildcard name
To load multiple database objects (tables and views) by a wildcard name (without creating individual source-to-destination transformations), follow the same steps as above, except:
FROM
Create a single source-to-destination transformation and set the FROM
to a fully qualified source database object name with wildcard characters (*
and ?
), for example, public.*
.
By default, all tables and all views that match the wildcard name in FROM
will be included. If you want to exclude specific database objects, enter a comma-separated list of the database objects to exclude in the Exclude objects
field. If you want to include only specific database objects, enter a comma-separated list of the database objects to include in the Include objects
.
TO
Set the TO
to the redshift_schema.*
, for example, public.*
.
Read how to programmatically change the destination (TO) name.
EXCLUDE
Optionally, configure the list of the database object to exclude.
To exclude all views, enter all views
in the Exclude objects
field.
To exclude all tables enter, all tables
in the Exclude objects
field.
INCLUDE
Optionally, configure the list of the database object to include.
SOURCE QUERY
Optionally, configure the Source query
. You can use the token {TABLE}
in the Source query
.
BEFORE/AFTER SQL
If you want to execute any code before and/or after COPY
, use token {TABLE}
in the Before COPY SQL
and After COPY SQL
.
.
MERGE
If you are configuring MERGE
action, do not enter anything in the Lookup Field
. Enable Predict Lookup Field
instead.
An alternative to enabling Predict Lookup Fields
(which is not always accurate) is specifying the list of table=fields
pairs in the Lookup Field
. Use the fully-qualified table names and ;
as a separator between table=fields
pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
HWM change replication
If you are setting up the high watermark change replication
with a calculated HWM
field, use the token {TABLE}
in the High Watermark Field Value
.
Create all string fields as original length multiple by N
In Redshift, there is no NVARCHAR datatype, and VARCHAR(1) is 1 byte, not 1 character. So Unicode strings are getting truncated when the Flow automatically creates the Reshift table from the source.
You can configure the destination Redshift Connection to multiple the original column length. Even with the multiplier, the max column length will not exceed 65535.
Varchar Field Size Multiplier
: the value of this field, if greater than 1, will be used to multiple the default size of the VARCHAR-like columns (except CHAR) when automatically creating the destination table.
Flow variables
To add user-defined Flow variables open Parameters
tab, add variables as key-value pairs. Use UPPERCASEd characters and no spaces for variable names.
Maximum number of parallel threads
It is possible to execute multiple source-to-destination transformations (extract-load) in parallel threads.
By default, Etlworks limits the number of threads to 5. To change the limit open Parameters
tab, set the max number of threads in Maximum Number of Parallel Threads
. This value cannot be greater than 99.
Common issues when loading data in Redshift
Read how to troubleshoot and fix common issues when loading data in Amazon Redshift.
Comments
0 comments
Please sign in to leave a comment.