Overview
Redshift is a column-based relational database. Therefore, you can use the same techniques you would normally use to work with relational databases in Etlworks Integrator. It is, however, important to understand that inserting data into Redshift row by row can be painfully slow.
It is recommended that you use Redshift-optimized flow to load data in Redshift.
Using Redshift-optimized flows you can extract data from any of the supported sources and load it directly into Redshift.
A typical Redshift flow performs the following operations:
- Extracts data from the source.
- Creates CSV files.
- Compresses files using the gzip algorithm.
- Copies files into Amazon S3 bucket.
- Checks to see if the destination Redshift table exists, and if it does not - creates the table using metadata from the source.
- Dynamically generates and executes the Redshift COPY command.
- Cleans up the remaining files, if needed.
Prerequisites
- The Redshift is up and running and available from the Internet.
- The Redshift user has INSERT privilege for the table(s).
- The Amazon S3 bucket is created and Redshift is able to access the bucket.
Step-by-step instruction
Step 1. Create all required connections.
You will need a source connection, an Amazon S3 connection used as a stage for the file to load, and a Redshift connection.
When configuring a connection for Amazon S3, which will be used as a stage for the Redshift flows, it is recommended that you select GZip
as the value for the Archive file before copying field:
Step 2. Create a data exchange format
Redshift can load data from CSV, JSON, Avro, and other data exchange formats but Etlworks only supports loading from CSV, so you will need to create a CSV format.
When configuring the CSV format, it is recommended to set the Value for null field to \N
, so the Redshift COPY command can differentiate between an empty string and NULL value.
If you are using a CSV format for loading large datasets into the Redshift, consider configuring a format to split the document into smaller files: Redshift can load files in parallel, also transferring smaller files over the network can be faster.
Step 3. Create a flow to load data in Redshift
Start creating Redshift flows by opening the Flows window, clicking the +
button, and typing redshift
into the search field:
Continue by selecting the flow type, adding source-to-destination transformations and entering the transformation parameters:
Step 4. Set Redshift Connection
For all Redshift flows, the final destination is redshift
.
To configure the final destination, click the Connections tab and select the available Redshift connection.
Step 5. Set the source and the destination
Depending upon the flow type, you can select one of the following sources (FROM) for the Redshift flow:
- Database - use the table name as the source (FROM) name
- CDC - use the fully qualified table name as the source (FROM) name
- API - use any appropriate string as the source (FROM) name
- Web Service - use any appropriate string as the source (FROM) name
- File - use the source file name or a wildcard filename as the source (FROM) name
- Queue - use the queue topic name as the source (FROM) name
For all Redshift flows, the destination connection is going to be Amazon S3 connection. Integrator uses the destination connection as a Redshift stage.
Select or enter the fully-qualified Redshift table name as a destination (TO).
Step 6. Set Data Exchange Format configured in step 2
Step 7. Set the optional parameters
Click the MAPPING button in the transformation row and select the Parameters tab.
Optional parameters and transformations
- Redshift Table Name - the name of the Redshift table to load data into. This field is optional and overrides the Redshift table name set at the transformation level.
- Column List - you can specify a comma-separated list of column names to load source data fields into specific target columns. The columns can be in any order in the COPY statement, but when loading from flat files, such as in an Amazon S3 bucket, their order must match the order of the source data.
- Truncate Columns - if this option is enabled, the string literals will be truncated during the load to the maximum allowed size for the designated column.
- Load empty chars as null - Indicates that Amazon Redshift should load empty CHAR and VARCHAR fields as NULL. Empty fields for other data types, such as INT, are always loaded with NULL. Empty fields occur when data contains two delimiters in succession with no characters between the delimiters.
- Load blank chars as null - Loads blank fields, which consist of only white space characters, as NULL. This option applies only to CHAR and VARCHAR columns. Blank fields for other data types, such as INT, are always loaded with NULL. For example, a string that contains three space characters in succession (and no other characters) is loaded as a NULL. The default behavior, without this option, is to load the space characters as is.
- Accept any date - Allows any date format, including invalid formats such as 00/00/00 00:00:00, to be loaded without generating an error. This parameter applies only to TIMESTAMP and DATE columns.
- Accept invalid UTF8 chars - Enables loading of data into VARCHAR columns even if the data contains invalid UTF-8 characters. When ACCEPTINVCHARS is specified, COPY replaces each invalid UTF-8 character with a string of equal length consisting of the character specified by replacement_char. The default replacement_char is a question mark ( ? ).
- Escape - When this parameter is specified, the backslash character in input data is treated as an escape character. The character that immediately follows the backslash character is loaded into the table as part of the current column value, even if it is a character that normally serves a special purpose. For example, you can use this parameter to escape the delimiter character, a quotation mark, an embedded newline character, or the escape character itself when any of these characters is a legitimate part of a column value.
- Remove quotes - Removes surrounding quotation marks from strings in the incoming data. All characters within the quotation marks, including delimiters, are retained.
- Explicit IDs - Use EXPLICIT_IDS with tables that have IDENTITY columns if you want to override the autogenerated values with explicit values from the source data files for the tables. If the command includes a column list, that list must include the IDENTITY columns to use this parameter. The data format for EXPLICIT_IDS values must match the IDENTITY format specified by the CREATE TABLE definition.
- File Format - the CSV format definition. Read about Redshift file formats. The format definition is optional. If not provided - the system will automatically create an ad-hoc definition based on the destination format configured for the transformation.
- IAM Role - the optional AWS IAM role used to access the S3 bucket. If the role is not provided the flow will use the Access Key and Secret configured for the S3 connection used in this transformation.
- Max # of Errors to Ignore - The default value is 0 and the limit is 100000. Use this parameter to allow loads to continue when certain rows fail to load into the table because of formatting errors or other inconsistencies in the data. Set this value to 0 or 1 if you want the load to fail as soon as the first error occurs.
- Refresh of Optimizer Statistics - Governs automatic computation and refresh of optimizer statistics at the end of a successful COPY command. By default, if the STATUPDATE parameter is not used, statistics are updated automatically if the table is initially empty. With STATUPDATE ON, statistics are updated automatically regardless of whether the table is initially empty. With STATUPDATE OFF, statistics are never updated.
- No Load - Checks the validity of the data file without actually loading the data. Use the NOLOAD parameter to make sure that your data file will load without any errors before running the actual data load. Running COPY with the NOLOAD parameter is much faster than loading the data because it only parses the files.
- Action - can be either
COPY INTO
- inserts records from the file(s) into the Redshift table orMERGE
- merges records from the file with the records in the Redshift table. MERGE requires configuring Lookup Fields. - Lookup Fields - the comma-separated list of fields which uniquely identify the record in the Redshift table. This field is required if action is set to MERGE.
- Copy SQL - this is a user-defined COPY SQL. By default, Integrator creates COPY SQL automatically, based on the input and output parameters. You can override it by using this field. Read more about the Redshift COPY command.
- MERGE SQL - this is a user-defined SQL that will be used instead of the default when action is set to
MERGE
. If nothing is entered in this field the default MERGE SQL will be used. The following parameters are automatically populated and can be referenced as {TOKEN} in the SQL:- {TABLE} - the table to MERGE data into,
- {TEMP_TABLE} - the table to merge data from,
- {KEY_FIELDS} - the fields uniquely identifying the record in both tables,
- {FIELDS} - the fields to INSERT/UPDATE in the table to MERGE data into.
- CDC MERGE SQL - this is a user-defined SQL that will be used instead of the default when action is set to
CDC MERGE
. If nothing is entered in this field the default CDC MERGE SQL will be used. The following parameters are automatically populated and can be referenced as {TOKEN} in the SQL:- {TABLE} - the table to MERGE data into,
- {TEMP_TABLE} - the table to merge data from,
- {KEY_FIELDS} - the fields uniquely identifying the record in both tables,
- {FIELDS} - the fields to INSERT,
- {UPDATE_FIELDS} - the fields and values to UPDATE in the format field=value,field=value, {UPDATE_CONDITIONS} - the WHERE clause to update existing records
- Purge File if Success - if this option is enabled (by default), and the user-defined SQL is not used, Integrator will run the COPY INTO command with the purge option enabled. With purge enabled, the staging file will be automatically deleted after the COPY INTO command is successfully executed.
- Purge File(s) if Error - if this option is enabled (by default) the staging files will be automatically deleted if a COPY INTO command was executed with an error.
- Name(s) of the File(s) to Purge - if "Purge File(s) if Error" is enabled, it is possible to configure which files you want the system to delete if an error occurs while executing the COPY command. Wildcard file names are allowed. If not specified, the system will use the same wildcard file name as it used for the COPY command.
- Before COPY SQL - this SQL will be executed on the Redshift connection before running COPY SQL.
- Ignore errors when executing Before COPY SQL - if this option is enabled, and there is an error when "Before COPY SQL" is executed - the error will be ignored.
- After COPY SQL - this SQL will be executed on the Redshift connection after running COPY SQL.
- Ignore errors when executing After COPY SQL - if this option is enabled, and there is an error when "After COPY SQL" is executed - the error will be ignored.
Parameters specific for different source types
Depending on the flow type, other flow parameters can be added, as explained below:
- The source is a database
- The source is a file or web service
- - if this parameter is enabled the system will copy files directly into the Redshift stage before executing the COPY command. This greatly improves the performance of the load but automatically disables any transformations.
- - if this parameter is enabled the system will automatically delete loaded files from the source.
- - if this parameter is enabled and there is an error during the load the system will automatically delete loaded files from the source.
- The source is a well-known API
Step 8. Optionally configure MERGE (UPSERT)
To merge (upsert) existing data in the Redshift table with new data:
- Set the Action to MERGE.
- Define the Lookup Fields - the comma-separated list of fields which uniquely identify the record in the Redshift table.
Alternatively, you can enable the Predict Lookup Fields which, if enabled, will force the flow to use various algorithms to automatically predict the fields that uniquely identify the record. Note that it is not always possible to correctly detect the unique fields.
When enabling the Predict Lookup Fields (which is not always accurate) is not an option you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
Step 9. Configure how to handle source and destination schema changes
It is quite typical when the source (for example, the table in the OLTP database) and the destination (Redshift table) have a different number of columns. It is also entirely possible that the order of columns in the source is different than the order of columns in the destination. In either of these cases, by default, the flow will fail since the Redshift COPY command cannot load files that have more or fewer columns than the target table or order of columns is different.
When setting the source-to-destination transformation, it is possible to configure it to automatically handle schema changes. The following configuration options are available:
- Reorder columns to load to match order of columns in the target table. Also update column types to match the target - if this option is enabled (it is disabled by default) the system will reorder columns in the data file to load to match the order of columns in the target Redshift table. When this option is enabled the system also updates the column data types to match the existing columns in the target table. Enable it to fix "Column type 'VALUE' is not recognized". This option is ignored if the target table does not exist.
- Ignore fields in the source that do not exist in the target table - if this option is enabled (it is disabled by default) and the source has more fields than the target (Redshift) table, the system will automatically exclude extra fields. Options 2, 3 and 4 are mutually exclusive.
- Alter target table if the source has columns that the target table doesn't have - if this option is enabled (it is disabled by default) and the source has different fields than the target (Redshift) table, the system will add extra fields to the target table. Options 2, 3 and 4 are mutually exclusive.
- Recreate target table if the source has columns that the target table doesn't have - if this option is enabled (it is disabled by default) and the source has different fields than the target (Redshift) table, the system will automatically drop and recreate the target table. Options 2, 3 and 4 are mutually exclusive.
- Insert null into the fields in the target table that do not exist in the source - if this option is enabled (it is disabled by default) and the target (Redshift) table has different fields than the source, the system will automatically insert NULL values into these fields in the target.
Configuring notifications if source and destination have different columns
It is possible to send an email notification if the source and destination have different columns. To configure flow to send a notification when either source has more columns than the destination or the destination has more columns than the source, use technique explained in this article.
Step 10. Optionally configure Mapping
If necessary, you can create a mapping between the source and destination (Redshift) fields.
You create mapping between the source and destination just like you usually do for any other flow type.
Mapping is not required, but please remember that if a source field name is not supported by Redshift, it will return an error and the data will not be loaded into the database. For example, if you are loading data from Google Analytics, the output (source) is going to include fields with the prefix ga:
( ga:user, ga:browser, etc. ). Unfortunately, Redshift does not support fields with a :
, so the data will be rejected. If that happens, you can use mapping to rename the destination fields:
Setup Change Replication using high watermark
As in any other flow type, it is possible to configure a change replication using high watermark.
When change replication is enabled, only the changed records will be loaded into Redshift.
Basically, all you need to do is set the high watermark field and enable change replication for the transformation. Additionally, for better reliability, you can set the calculated high watermark field value.
Setup change replication using change data capture (CDC)
Change data capture (CDC) is an approach to data integration that is based on the identification, capture, and delivery of the changes made to the source database and stored in the database redo log (also called transaction log).
Etlworks supports replicating data using CDC from the MySQL, SQL Server, Oracle, Postgres, and MongoDB.
Read about configuring CDC for the source databases:
Once the CDC is configured for the source database you can create a CDC pipeline where the source is one of these databases and the destination is Amazon Redshift.
When creating a point-to-point CDC pipeline for extracting data from a CDC-enabled database and loading it into the Redshift you have 2 options.
- Stream CDC events, create CSV files + load data from CSV files into Redshift.
- Stream CDC events into a message queue + load data from a message queue into Redshift.
Loading multiple tables by a wildcard name
To load multiple database objects (tables and views) by a wildcard name (without creating individual source-to-destination transformations) follow the same steps as above, except:
FROM
Create a single source-to-destination transformation and set the FROM to a fully qualified source database object name with wildcard characters (* and ?), for example public.*
By default, all tables and all views that match the wildcard name in FROM will be included. If you want to exclude specific database objects enter a comma-separated list of the database objects to exclude in the Exclude objects field. If you want to include only specific database objects enter a comma-separated list of the database objects to include in the Include objects.
TO
Set the TO to the redshift_schema.*
, for example public.*
.
Read how to programmatically change the destination (TO) name.
EXCLUDE
Optionally configure the list of the database object to exclude.
To exclude all views enter all views
in the Exclude objects field.
To exclude all tables enter all tables
in the Exclude objects field.
INCLUDE
Optionally configure the list of the database object to include.
SOURCE QUERY
Optionally configure the Source query. You can use token {TABLE}
in the source query.
BEFORE/AFTER SQL
If you want to execute any code before and/or after COPY use token {TABLE}
in the Before COPY SQL and After COPY SQL
MERGE
If you are configuring MERGE action do not enter anything in the Lookup Field. Enable Predict Lookup Field instead.
Alternatively to enabling the Predict Lookup Fields option (which is not always accurate) you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
HWM CHANGE REPLICATION
If you are setting up the high watermark change replication with a calculated HWM field use token {TABLE}
in the High Watermark Field Value.
Create all string fields as original length multiple by N
In Redshift, there is no NVARCHAR datatype, and VARCHAR(1) is 1 byte, not 1 character. So Unicode strings are getting truncated when the flow automatically creates the Reshift table from the source.
You can configure the destination Redshift connection to multiple the original column length. Even with the multiplier, the max column length will not exceed 65535.
- Varchar Field Size Multiplier - the value of this field, if greater than 1, will be used to multiple the default size of the VARCHAR-like columns (except CHAR) when automatically creating the destination table.
Common issues when loading data in Redshift
Read how to troubleshoot and fix common issues when loading data in Amazon Redshift.
Comments
0 comments
Please sign in to leave a comment.