Overview
A Bulk Load is a method provided by a database management system to load multiple rows of data from the source file into a database table. Etlworks includes several Flow types optimized for the bulk load.
When to use this Flow type
Usually, bulk operations are not logged, and transactional integrity is not enforced. Often, bulk operations bypass triggers and integrity checks. This improves performance for loading large amounts of data quite significantly.
Use ETL with a bulk load when you need to extract data from any source, transform and load it into the database which supports a bulk load.
This Flow is intended to transform the data before loading. Read how to bulk load files in the local and cloud store data into databases.
In many cases, using bulk load is overkill, and setting it up is a more complex process than alternative methods available in Etlworks. Read how to load data into databases without bulk operations.
Prerequisites
The destination database must support bulk load operations.
Below are some of the examples for the most commonly used databases:
- SQL Server BULK INSERT statement
- RDS SQL Server BULK INSERT statement using Amazon S3 integration
- Azure SQL BULK INSERT from Azure blob
- Postgres COPY command
- RDS Postgres using aws_s3.table_import_from_s3 with Amazon S3 integration
- MySQL LOAD DATA INFILE statement
- Amazon Aurora MySQL LOAD DATA INFILE statement using Amazon S3 integration
- Oracle Inline External Tables
How it works
A typical bulk load Flow performs the following operations:
- It extracts data from the source.
- It then creates CSV, JSON, XML, Avro, or Parquet files in the staging area, which can be one of the following: local file system, remote FTP or SFTP server, Azure Blob, Google Cloud Storage, or Amazon S3. The actual location of the staging area depends on the specific implementation of the bulk load command for the destination database.
- If needed, it compresses files using the gzip algorithm.
- It checks to see if the destination table exists, and if it does not, creates the table using metadata from the source.
- It executes the user-defined SQL statements for the bulk load.
- It then optionally executes user-defined or automatically generated
MERGE
statements. - In the end, it cleans up the remaining files in the staging, if needed.
Flows optimized for bulk load support wildcard processing and change replication using high watermark (HWM).
Step-by-step instruction
Here are the detailed steps on how to ETL data into database using bulk-load flow:
Step 1. Create all required Connections
You will need a source Connection, a Connection for the staging area (Server Storage, Amazon S3, Azure Blob, Google Cloud Storage, SFTP, FTP), and a destination database Connection. When creating a destination database Connection, is it recommended to enable the Auto Commit
.
When to use the specific type of the destination Connection
Connection type | Use this Connection type if |
---|---|
Server storage | the database is located on the same machine or on the same network as Etlworks. The database server must be able to directly access files created by the Flow. |
Amazon S3 | the destination database is an RDS or Amazon Aurora and supports loading files from the mapped S3 bucket. |
Azure blob | the destination database is hosted on Azure and supports loading files from the mapped Azure blob. |
Google Cloud Storage | the destination database is hosted on Azure and supports loading files from the mapped Google Cloud storage bucket. |
the destination database could not directly access the files created by the Flow but could access the folder in a network that is also exposed as a directory in the FTP or SFTP server. |
When configuring a Connection for cloud storage such as Amazon S3, which will be used as a staging area for the bulk load Flow, it is recommended that you select GZip
as the value for the Archive file before copying
field. The destination database must support loading gzipped files.
Step 2. Create a data exchange Format
Most databases can load files in the CSV Format. Some can also load JSON, XML, Avro, or Parquet files, so you will need to create one of these and set it as a destination Format.
For CSV format it is recommended to set the Maximum number of rows in file
so the flow can split the large datasets extracted from the source into chunks. Loading smaller files will be much faster, specifically if you enable Use Parallel Threads when processing sources by a wildcard
. Read more about performance optimizations when loading large datasets.
Step 3. Create a Flow to bulk load data into the destination database
Start creating the Flow by opening the Flows
window, clicking +
, and typing bulk
into the search field:
Continue by selecting the Flow type, adding source-to-destination transformations, and entering the transformation parameters.
Step 4. Set destination Connection
To configure the actual destination, click the Connections
tab, and select the Connection created in Step 1.
Step 5. Set the source (FROM) and the destination (TO)
Depending upon the Flow type, you can select one of the following sources (FROM
) for the bulk load Flow:
- Database: use the database table or view name as the source (
FROM
) name. The wildcard names, for example,public.*
are supported. - Well-known API: use the endpoint name or the file name as the source (
FROM
) name. - Web Service: use the endpoint name as the source (
FROM
) name. - File: use the source file name or a wildcard file name as the source (
FROM
) name. - Queue: use the queue name as the source (
FROM
) name.
For all bulk load Flows, the destination Connection is going to be either Server Storage, Amazon S3, Azure Blob, Google Cloud Storage, SFTP, or FTP. Etlworks uses the destination Connection as a staging area.
In TO
, select or enter the fully-qualified destination table name or a wildcard table name. If the source (FROM
) is entered using a wildcard template, set the destination (TO
) to a wildcard as well.
Step 6. Configure SQL for bulk load
Click MAPPING
in the transformation row and select the Parameters
tab. Select the Copy SQL
field.
Enter SQL statement that will be used to bulk load data into the destination table. The following {tokens}
can be used as a part of the SQL statement:
{TABLE}
: the table to load data into.{TEMP_TABLE}
: the staging table name used for MERGE.{FILE_TO_LOAD}
: the file name to load without path and extension.PATH
: the path of the file to load without file name, for example{app.data}/test/
.EXT
: the file extension of the file to load, without.
, for example,csv
.
Example for SQL Server installed on-premise:
BULK INSERT {TABLE}
FROM '/data/{FILE_TO_LOAD}.dat'
WITH
(
FIELDTERMINATOR = ',',
FIRSTROW = 2,
ROWTERMINATOR = '\n'
)
Notice:
{TABLE}
: the fully qualified destination table name./data
: is a location of the staging area.{FILE_TO_LOAD}
: the file to load without directory and extension..dat
: the file extension. By default, CSV files are created with the extension.dat
.
This SQL is different for each database that supports bulk loading from the file. Take a look at examples for other databases.
Additionally, in the field Exception when table not found
, enter the part of error that will be generated when the destination table does not exist, and the Flow needs to create it automatically.
This error is different for each database that supports bulk loading from the file.
Step 7. Set optional parameters
Click MAPPING
in the transformation row and select the Parameters
tab
7.1 COPY SQL
parameters and Action
Action
: can be eitherCOPY
: inserts records from the file(s) into the destination table orMERGE
: merges records from the file with the records in the destination table.MERGE
requires configuringLookup Fields
.Lookup Fields
: the comma-separated list of fields that uniquely identify the record in the destination table. This field is required if action is set toMERGE
.Predict Lookup Fields
- If this option is enabled andLookup Fields
is empty, the system will try to predict the columns that uniquely identify the record in the destination table.COPY SQL
: this is a user-defined SQL for bulk loading data into the destination table.MERGE SQL
: this is a user-defined SQL that will be used instead of the default when action is set toMERGE
. If nothing is entered in this field, the defaultMERGE SQL
will be used. The following parameters are automatically populated and can be referenced as{TOKEN}
in the SQL:{TABLE}
: the table toMERGE
data into.{TEMP_TABLE}
: the table to merge data from.{KEY_FIELDS}
: the fields uniquely identifying the record in both tables.{FIELDS}
: the fields toINSERT
/ in the table toMERGE
data into.
7.2 Other parameters
Purge File if Succcess
: with purge enabled, the staging file will be automatically deleted after theCOPY
command is successfully executed.Purge File if Error
: if this option is enabled (by default), the staging files will be automatically deleted if aCOPY
command was executed with an error.Before COPY SQL
: this SQL will be executed on the destination database before runningCOPY SQL
.Ignore errors when executing Before COPY SQL
: if this option is enabled, and there is an error whenBefore COPY SQL
is executed, the error will be ignored.After COPY SQL
: this SQL will be executed on the destination database after runningCOPY SQL
.Ignore errors when executing After COPY SQL
: if this option is enabled, and there is an error whenAfter COPY SQL
is executed, the error will be ignored.
7.3 Parameters specific for different source types
Depending on the Flow type, other Flow parameters can be added, as explained below:
- The source is a database.
- The source is a file or web service.
Ignore Transformations
: if this parameter is enabled, the system will copy files directly into the stage before executing theCOPY
command. This greatly improves the performance of the load but automatically disables any transformations.Delete loaded source files
: if this parameter is enabled, the system will automatically delete loaded files from the source.Delete source files on error
: if this parameter is enabled and there is an error during the load, the system will automatically delete loaded files from the source.
- The source is a well-known API.
Step 8. Optionally Configure MERGE (UPSERT)
To merge (upsert) existing data in the destination table with new data:
- Set the
Action
toMERGE
. - Define the
Lookup Fields
: the comma-separated list of fields that uniquely identify the record in the destination table.
Alternatively, you can enable the Predict Lookup Fields
which, if enabled, will force the Flow to use various algorithms to automatically predict the fields that uniquely identify the record. Note that it is not always possible to correctly detect the unique fields.
When enabling the Predict Lookup Fields
(which is not always accurate) is not an option you can specify the list of table=field
pairs in the Lookup Fields
. Use the fully-qualified table names and ;
as a separator between table=field
pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
Step 9. Configure how to handle source and destination schema changes
It is quite typical for the source and the destination to have a different number of columns. It is also entirely possible that the order of columns in the source is different than the order of columns in the destination. In either of these cases, by default, the Flow will fail since the COPY
command typically cannot load files that have more or fewer columns than the target table or the order of columns is different.
When setting the source-to-destination transformation, it is possible to configure it to automatically handle schema changes. The following configuration options are available:
- Reorder columns to load to match order of columns in the target table. Also update column types to match the target: if this option is enabled (it is disabled by default) the system will reorder columns in the data file to match the order of columns in the destination table. When this option is enabled the system also updates the column data types to match the existing columns in the destination table.
- Ignore fields in the source that do not exist in the target table: if this option is enabled (it is disabled by default) and the source has more fields than the destination table, the system will automatically exclude extra fields. Options 2, 3, and 4 are mutually exclusive.
- Alter target table if the source has columns that the target table doesn't have: if this option is enabled (it is disabled by default) and the source has different fields than the destination table, the system will add extra fields to the target table. Options 2, 3, and 4 are mutually exclusive.
- Recreate target table if the source has columns that the target table doesn't have: if this option is enabled (it is disabled by default) and the source has different fields than the destination table, the system will automatically drop and recreate the target table. Options 2, 3, and 4 are mutually exclusive.
- Insert null into the fields in the target table that do not exist in the source: if this option is enabled (it is disabled by default) and the destination table has different fields than the source, the system will automatically insert
NULL
values into these fields in the target. - Create table SQL: by default system automatically generates an SQL for creating a new table if it does not exist. You can override the default SQL by providing a template or a JavaScript code. Read more about how to override the create table SQL.
9.1 Configuring notifications if source and destination have different columns
It is possible to send an email notification if the source and destination have different columns. To configure Flow to send a notification when either source has more columns than the destination or the destination has more columns than the source, use the technique explained in this article.
The only difference is that you don't need to configure the exception handler. If the system detects that the source and the destination have a different number of columns, it will add an exception to the log, which then can be retrieved and sent in the email body as explained above.
Step 10. Optionally configure Mapping
If necessary, you can create a Mapping between the source and destination fields.
Setup incremental Change Replication using high watermark
As in any other Flow type, it is possible to configure a change replication using a high watermark.
When change replication is enabled, only the changed records will be loaded into the destination table.
Basically, all you need to do is set the High Watermark Field
and Enable Change Replication
for the transformation. Additionally, for better readability, you can set the calculated High Watermark Field Value
.
Loading multiple tables by a wildcard name
To load multiple databases objects (tables and views) by a wildcard name (without creating individual source-to-destination transformations), follow the same steps as above, except:
FROM
Create a single source-to-destination transformation and set the FROM
to a fully qualified source database object name with wildcard characters (* and ?), for example, public.*
By default, all tables and all views that match the wildcard name in FROM
will be included. If you want to exclude specific database objects, enter a comma-separated list of the database objects to exclude in the Exclude Objects
field. If you want to include only specific database objects, enter a comma-separated list of the database objects to include in the Include Objects
field.
TO
Set the TO
to the SNOWFLAKE_DW.SCHEMA.*
, for example, UTIL_DB.PUBLIC.*
.
Read how to programmatically change the destination (TO) name.
EXCLUDE
Optionally configure the list of the database object to exclude.
To exclude all views, enter
all views in the Exclude Objects
field.
To exclude all tables, all tables
in the Exclude Objects
field.
INCLUDE
Optionally configure the list of the database object to include.
SOURCE QUERY
Optionally configure the Source query
. You can use token {TABLE}
in the Source query
.
BEFORE/AFTER SQL
If you want to execute any code before and/or after COPY INTO
use token {TABLE}
in the Before COPY INTO SQL
and After COPY INTO SQL
.
MERGE
If you are configuring MERGE
action do not enter anything in the Lookup Fields
. Enable Predict Lookup Field
instead.
Alternatively, to enable the Predict Lookup Fields
option (which is not always accurate), you can specify the list of table=field
pairs in the Lookup Fields
. Use the fully-qualified table names and ;
as a separator between table=field
pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
HWM CHANGE REPLICATION
If you are setting up the high watermark change replication with a calculated HWM field, use token {TABLE}
in the High Watermark Field Value
.
Examples
Azure SQL BULK INSERT from Azure blob
The following example shows how to load data from a CSV file in an Azure Blob storage location on which you have created a SAS key. The Azure Blob storage location is configured as an external data source. This requires a database scoped credential using a shared access signature that is encrypted using a master key in the user database.
Step 1. Create an optional master encryption key. A master key is not required if a database scoped credential is not required because the blob is configured for public (anonymous) access.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'YourStrongPassword1';
Step 2. Create an optional database scoped credential. A database scoped credential is not required if the blob is configured for public (anonymous) access.
Make sure that you don't have a leading ? in SAS token, and that you have at least read permission on the object that should be loaded srt=o&sp=r, and that expiration period is valid (all dates are in UTC time).
CREATE DATABASE SCOPED CREDENTIAL MyAzureBlobStorageCredential
WITH IDENTITY = 'SHARED ACCESS SIGNATURE',
SECRET = '***srt=sco&sp=rwac&se=2017-02-01T00:55:34Z&st=2016-12-29T16:55:34Z************';
Step 3. Create an external data source.
CREDENTIAL is not required if a blob is configured for public (anonymous) access.
CREATE EXTERNAL DATA SOURCE MyAzureBlobStorage
WITH ( TYPE = BLOB_STORAGE,
LOCATION = 'https://****************.blob.core.windows.net/invoices',
CREDENTIAL= MyAzureBlobStorageCredential;
Step 4. Create a bulk load flow where the destination is Azure blob connection pointed to the container configured for the external data source in step 3.
Step 5. Enter COPY SQL. Use the data_source created in step 3.
Example:
BULK INSERT {TABLE}
FROM '{FILE_TO_LOAD}.csv'
WITH (
DATA_SOURCE = 'MyAzureBlobStorage',
FIELDTERMINATOR = ',',
CODEPAGE = '65001',
FORMAT = 'CSV',
ROWTERMINATOR = '0x0a',
FIRSTROW = 2,
FIELDQUOTE = '"',
MAXERRORS = 10)
Load large datasets
Flows optimized for the bulk load can move extremely large (billions of rows) datasets from the source to the destination. Below are some of the techniques recommended for archiving better performance.
Configure flow to split CSV files in chunks
If the Flow is configured to load CSV files set the Maximum number of rows in file
so the Flow can split the large datasets extracted from the source into chunks. Loading smaller files will be much faster. For most cases, we recommend setting this parameter to 100000
.
Enable parallel load
When the destination CSV format is configured to split the large dataset in chunks (see above) the source-to-destination transformation in the bulk load Flow can be configured to load chunked files in parallel threads.
To enable the parallel load just enable the parameter Use Parallel Threads
when processing sources by a wildcard
under MAPPING>Parameters->Common
Parameters
.
Enable high watermark change replication (HWM)
When high watermark change replication is enabled, only the changed records will be extracted and loaded into the destination table.
Comments
0 comments
Please sign in to leave a comment.