Overview
A Bulk Load is a process or method provided by a database management system to load multiple rows of data from the source file into a database table. Etlworks includes several flow types optimized for the bulk load.
When to use this flow type
Usually, bulk operations are not logged, and transactional integrity is not enforced. Often bulk operations bypass triggers and integrity checks. This improves performance for loading large amounts of data quite significantly.
In most cases using bulk load is overkill and setting it up is complex than alternative methods available in Etlworks. Read how to load data into databases without bulk operations.
Prerequisites
The destination database must support bulk load operations.
Below are some of the examples for the most commonly used databases:
- SQL Server BULK INSERT statement
- RDS SQL Server BULK INSERT statement using Amazon S3 integration
- Postgres COPY command
- RDS Postgres COPY command using Amazon S3 integration
- MySQL LOAD DATA INFILE statement
- Amazon Aurora MySQL LOAD DATA INFILE statement using Amazon S3 integration
- Oracle Inline External Tables
How it works
A typical bulk load flow performs the following operations:
- It extracts data from the source.
- I then creates CSV, JSON, XML, Avro, or Parquet files in the staging area which can be one of the following: local file system, remote FTP or SFTP server, Azure Blob, Google Cloud Storage, or Amazon S3. The actual location of the staging area depends on the specific implementation of the bulk load command for the destination database.
- If needed it compresses files using the gzip algorithm.
- It checks to see if the destination table exists, and if it does not - creates the table using metadata from the source.
- It executes the user-defined SQL statements for the bulk load.
- It then optionally executes user-defined or automatically generated MERGE statement.
- At the end it cleans up the remaining files in the staging, if needed.
Flows optimized for bulk load support wildcard processing and change replication using high watermark (HWM).
Step-by-step instruction
Step 1. Create all required connections
You will need a source connection, a connection for the staging area (Server Storage, Amazon S3, Azure Blob, Google Cloud Storage, SFTP, FTP), and a destination database connection. When creating a destination database connection disable the auto-commit.
When to use the specific type of the destination connection
- Server storage - use this connection type if the database is located on the same machine or on the same network as Etlworks Integrator. The database server must be able to directly access files created by the flow.
- Amazon S3 - use this connection type if the destination database is an RDS or Amazon Aurora and supports loading files from the mapped S3 bucket.
- Azure blob - use this connection type if the destination database is hosted on Azure and supports loading files from the mapped Azure blob.
- Google Cloud Storage - use this connection type if the destination database is hosted on Azure and supports loading files from the mapped Google Cloud storage bucket.
- FTP or SFTP - use this connection type if the destination database could not directly access the files created by the flow but could access the folder in a network that is also exposed as a directory in the FTP or SFTP server.
When configuring a connection for Amazon S3 which will be used as a staging area for the bulk load flow, it is recommended that you select GZip
as the value for the Archive file before copying field.
Step 2. Create a data exchange format
Most databases can load files in the CSV format. Some can also load JSON, XML, Avro, or Parquet files, so you will need to create one of these and set it as a destination format.
Step 3. Create a flow to bulk load data into the destination database
Start creating the flow by opening the Flows window, clicking the +
button, and typing bulk load
into the search field:
Continue by selecting the flow type, adding source-to-destination transformations, and entering the transformation parameters.
Step 4. Set destination connection
To configure the actual destination, click the Connections tab, and select the connection created in Step 1.
Step 5. Set the source (FROM) and the destination (TO)
Depending upon the flow type, you can select one of the following sources (FROM) for the bulk load flow:
- Database - use the database table or view name as the source (FROM) name
- CDC - use the fully qualified table name as the source (FROM) name
- API - use any appropriate string as the source (FROM) name
- Web Service - use any appropriate string as the source (FROM) name
- File - use the source file name or a wildcard file name as the source (FROM) name
- Queue - use the queue topic name as the source (FROM) name
For all bulk load flows, the destination connection is going to be either Server Storage, Amazon S3, Azure Blob, Google Cloud Storage, SFTP, or FTP. The Integrator uses the destination connection as a staging area.
In TO select or enter the fully-qualified destination table name or a wildcard table name.
Step 6. Configure SQL for bulk load
Click the MAPPING button in the transformation row and select the Parameters tab. Select the Copy SQL field.
Enter SQL statement that will be used to bulk load data into the destination table. The following {tokens} can be used as a part of the SQL statement:
- {TABLE} - the actual destination table name
- {TEMP_TABLE} - the temporary table name which is used as a temporary destination when the MERGE action is selected.
- {FILE_TO_LOAD} - the source file name without a directory and an extension. You will need to specify both (see example below).
Example for SQL Server installed on-premise:
BULK INSERT {TABLE}
FROM '/data/{FILE_TO_LOAD}.dat'
WITH
(
FIELDTERMINATOR = ',',
FIRSTROW = 2,
ROWTERMINATOR = '\n'
)
Notice:
{TABLE}
- the fully qualified destination table name/data
- is a location of the staging area{FILE_TO_LOAD}
- the file to load without directory and extension.dat
- the file extension. By default, CSV files are created with the extension.dat
.
This SQL is different for each database that supports bulk loading from the file. Take a look at examples for other databases.
Additionally, in the field Exception when table not found, enter the part of error that will be generated when the destination table does not exist and the flow needs to create it automatically.
This error is diffrenent for each database that supports bulk loading from the file.
Step 7. Set optional parameters
Click the MAPPING button in the transformation row and select the Parameters tab.
COPY SQL parameters and Action
- Action - can be either
COPY
- inserts records from the file(s) into the destination table orMERGE
- merges records from the file with the records in the destination table. MERGE requires configuring Lookup Fields. - Lookup Fields - the comma-separated list of fields that uniquely identify the record in the destination table. This field is required if action is set to MERGE.
- - If this option is enabled and Lookup Fields is empty the system will try to predict the columns that uniquely identify the record in the destination table.
- COPY SQL - this is a user-defined SQL for bulk loading data into the destination table.
- MERGE SQL - this is a user-defined SQL that will be used instead of the default when action is set to
MERGE
. If nothing is entered in this field the default MERGE SQL will be used. The following parameters are automatically populated and can be referenced as {TOKEN} in the SQL:- {TABLE} - the table to MERGE data into,
- {TEMP_TABLE} - the table to merge data from,
- {KEY_FIELDS} - the fields uniquely identifying the record in both tables,
- {FIELDS} - the fields to INSERT/UPDATE in the table to MERGE data into.
Other parameters
- Purge File if Success - with purge enabled, the staging file will be automatically deleted after the COPY command is successfully executed.
- Purge File(s) if Error - if this option is enabled (by default) the staging files will be automatically deleted if a COPY command was executed with an error.
- Before COPY SQL - this SQL will be executed on the destination database before running COPY SQL.
- Ignore errors when executing Before COPY SQL - if this option is enabled, and there is an error when "Before COPY SQL" is executed - the error will be ignored.
- After COPY SQL - this SQL will be executed on the destination database after running COPY SQL.
- Ignore errors when executing After COPY SQL - if this option is enabled, and there is an error when "After COPY SQL" is executed - the error will be ignored.
Parameters specific for different source types
Depending on the flow type, other flow parameters can be added, as explained below:
- The source is a database
- The source is a file or web service
- - if this parameter is enabled the system will copy files directly into the stage before executing the COPY command. This greatly improves the performance of the load but automatically disables any transformations.
- - if this parameter is enabled the system will automatically delete loaded files from the source.
- - if this parameter is enabled and there is an error during the load the system will automatically delete loaded files from the source.
- The source is a well-known API
Step 8. Optionally Configure MERGE (UPSERT)
To merge (upsert) existing data in the destination table with new data:
- Set the Action to MERGE.
- Define the Lookup Fields - the comma-separated list of fields that uniquely identify the record in the destination table.
Alternatively, you can enable the Predict Lookup Fields which, if enabled, will force the flow to use various algorithms to automatically predict the fields that uniquely identify the record. Note that it is not always possible to correctly detect the unique fields.
When enabling the Predict Lookup Fields (which is not always accurate) is not an option you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
Step 9. Configure how to handle source and destination schema changes
It is quite typical for the source and the destination to have a different number of columns. It is also entirely possible that the order of columns in the source is different than the order of columns in the destination. In either of these cases, by default, the flow will fail since the COPY command typically cannot load files that have more or fewer columns than the target table or order of columns is different.
When setting the source-to-destination transformation, it is possible to configure it to automatically handle schema changes. The following configuration options are available:
- Reorder columns to load to match order of columns in the target table. Also update column types to match the target - if this option is enabled (it is disabled by default) the system will reorder columns in the data file to match the order of columns in the destination table. When this option is enabled the system also updates the column data types to match the existing columns in the destination table.
- Ignore fields in the source that do not exist in the target table - if this option is enabled (it is disabled by default) and the source has more fields than the destination table, the system will automatically exclude extra fields. Options 2, 3, and 4 are mutually exclusive.
- Alter target table if the source has columns that the target table doesn't have - if this option is enabled (it is disabled by default) and the source has different fields than the destination table, the system will add extra fields to the target table. Options 2, 3, and 4 are mutually exclusive.
- Recreate target table if the source has columns that the target table doesn't have - if this option is enabled (it is disabled by default) and the source has different fields than the destination table, the system will automatically drop and recreate the target table. Options 2, 3, and 4 are mutually exclusive.
- Insert null into the fields in the target table that do not exist in the source - if this option is enabled (it is disabled by default) and the destination table has different fields than the source, the system will automatically insert NULL values into these fields in the target.
Configuring notifications if source and destination have different columns
It is possible to send an email notification if the source and destination have different columns. To configure flow to send a notification when either source has more columns than the destination or the destination has more columns than the source, use the technique explained in this article.
The only difference is that you don't need to configure the exception handler. If the system detects that the source and the destination have a different number of columns it will add an "exception" to the log which then can be retrieved and send in the email body as explained above.
Step 10. Optionally configure Mapping
If necessary, you can create a mapping between the source and destination fields.
Setup incremental Change Replication using high watermark
As in any other flow type, it is possible to configure a change replication using a high watermark.
When change replication is enabled, only the changed records will be loaded into the destination table.
Basically, all you need to do is set the high watermark field and enable change replication for the transformation. Additionally, for better readability, you can set the calculated high watermark field value.
Loading multiple tables by a wildcard name
To load multiple databases objects (tables and views) by a wildcard name (without creating individual source-to-destination transformations) follow the same steps as above, except:
FROM
Create a single source-to-destination transformation and set the FROM to a fully qualified source database object name with wildcard characters (* and ?), for example public.*
By default, all tables and all views that match the wildcard name in FROM will be included. If you want to exclude specific database objects enter a comma-separated list of the database objects to exclude in the Exclude objects field. If you want to include only specific database objects enter a comma-separated list of the database objects to include in the Include objects field.
TO
Set the TO to the SNOWFLAKE_DW.SCHEMA.*
, for example UTIL_DB.PUBLIC.*
.
Read how to programmatically change the destination (TO) name.
EXCLUDE
Optionally configure the list of the database object to exclude.
To exclude all views enter all views
in the Exclude objects field.
To exclude all tables enter all tables
in the Exclude objects field.
INCLUDE
Optionally configure the list of the database object to include.
SOURCE QUERY
Optionally configure the Source query. You can use token {TABLE}
in the source query.
BEFORE/AFTER SQL
If you want to execute any code before and/or after COPY INTO use token {TABLE}
in the Before COPY INTO SQL and After COPY INTO SQL
MERGE
If you are configuring MERGE action do not enter anything in the Lookup Field. Enable Predict Lookup Field instead.
Alternatively to enabling the Predict Lookup Fields option (which is not always accurate) you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
HWM CHANGE REPLICATION
If you are setting up the high watermark change replication with a calculated HWM field use token {TABLE}
in the High Watermark Field Value.
Comments
0 comments
Please sign in to leave a comment.