- Starter
- Business
- Enterprise
- On-Premise
- Add-on
Overview
Etlworks includes several Flows optimized for extracting data from various sources, transforming, and loading it into relational databases.
Change Data Capture (CDC) pipeline allows extracting changes in real-time from the databases that support CDC and loading them into any supported relational database.
We use a heavily modified embedded Debezium engine for CDC.
Supported source databases
- MySQL
- SQL Server
- PostgreSQL
- Oracle
- DB2
- MongoDB
- AS400 (IBMI platfroms)
What destination databases are supported
We support any relational database so long as there is a JDBC driver for that database. Here is a current list of the supported databases. If you wish to CDC data into the database that is not on the list let us know by emailing support@etlworks.com and will add it as soon as possible.
When to use this pipeline
Use the pipeline described in this article to extract data from the CDC-enabled database and load it into relational databases in real time.
How it works
The end-to-end CDC pipeline extracts data from a CDC-enabled database and loads it into a supported relational database.
There are three options when creating a pipeline:
1. A single flow that streams CDC events directly into a relational database.
This flow streams CDC events directly into a relational database in real time. The advantage of the pipeline with a single flow is simplicity. There is just one flow to configure, schedule, and monitor.
2. A pipeline with independent Extract and Load flows
- Extract Flow: this Flow extracts data from a CDC-enabled database.
- Load Flow: this Flow loads data into a relational database.
The extract and load Flows are running in parallel which guarantees a very high processing speed and low latency. In fact, the actual pipeline can include multiple independent extract and load Flows which allows it to scale horizontally between multiple processing nodes.
3. A CDC pipeline that uses a bulk load for loading data into the relational database.
The destination database must support the bulk load.
Prerequisites
The destination database is on the list of supported databases.
A pipeline with a single flow
CDC flow streams CDC events directly into a relational database in real time.
There is no need to create a separate Flow for the initial load. The first time it connects to a CDC-enabled source database, it reads a consistent snapshot of all of the included databases and tables. When that snapshot is complete, the Flow continuously reads the changes that were committed to the transaction log and generates the corresponding insert, update, and delete events.
Read more about CDC in Etlworks.
Step 1. Create a CDC Connection for the source database.
Read how to create a CDC connection.
Step 2. Create a Database Connection for the destination database.
Read how to create a database connection.
Step 3. Create a connection for history and offset files.
Read how to create CDC Offset and History connection.
Etlworks CDC connectors store the history of DDL changes for the monitored database in the history file and the current position in the transaction log in the offset file.
Typical CDC extract flow starts by snapshotting the monitored tables (A) or starts from the oldest known position in the transaction (redo) log (B), then proceeds to stream changes in the source database (C). If the Flow is stopped and restarted, it resumes from the last recorded position in the transaction log. The connection created in this step can be used to reset the CDC pipeline and restart the process from scratch.
The connection, by default, points to the directory{app.data}/debezium_data
.
Step 5. Create CDC flow.
In Flows clickAdd flow
. Type incdc
in Select Flow type. SelectStream CDC events into relational database
.
Left to right: select the CDC connection created in step 1, select tables to monitor in FROM, select the database connection created in step 2, and select or enter the destination table name in TO. When streaming data from multiple source tables set the destination table using a wildcard template in the following format:database.schema.prefix_*_suffix
, wheredatabase.schema
is a database and schema in the relational database to load data into.
Step 6. Optionally configure mapping.
Click theMAPPING
button.
Use the mapping editor to configure the per-field mapping.
Here is what you can do in mapping:
- Disable the column.
- Change the column name.
- Change the column data type when creating a new table.
- Add a function in JavaScript to calculate the column value.
You cannot add columns in mapping which do not exist in the CDC stream.
Read more about mapping.
Step 7. Configure load parameters
Select theParameters
tab.
If needed, modify the following Load parameters:
Action
: the action be eitherINSERT
orMERGE
(default). If the action is set to MERGE the flow will INSERT records that do not exist in the destination table, UPDATE existing records, and DELETE records that were deleted in the source table.Lookup Fields
: a comma-separated list of columns that uniquely identify the record in the destination table.MERGE
action requires a list of columns that uniquely identify the record. By default, the flow will attempt to predict the Lookup Fields by checking unique indexes in the source and destination tables, but if there is no unique index in either table it is not guaranteed that the prediction will be 100% accurate. Use this parameter to define the Lookup Fields in the following format:fully.qualified.table1=field1,field2;fully.qualified.table2=field1,field2
.Custom SQL
: the value of this parameter is used to generate the custom SQL statement for updating data in the destination table. Some databases support the UPSERT SQL clause, which looks like INSERT: UPSERT INTO table (columns) VALUES (values). However, the actual syntax of the UPSERT very much differs depending on the driver. The developer can provide a template for UPSERT, which will be used at runtime to generate the actual SQL statement. If the value of this parameter is empty, the flow generates the default INSERT, MERGE and DELETE SQL statements using the dialect of SQL for the target database.Batch Window
: this parameter is used to group SQL statements in batches, then execute each batch in one database call, which improves the performance of the load. The total number of statements in a batch cannot exceed the Batch Window. Setting this parameter to 1 disables batch processing.Force batch execution every (ms)
: if batch processing is enabled (Batch Window > 1), the flow will force the batch execution after the configured number of milliseconds, even if the current number of statements in the batch is less than the batch window. This parameter is used to prevent the situation when the number of updates for the specific table is so low that it constantly stays below the batch window, preventing pushing the updates to the destination database.Force auto-commit
: when this option is enabled, the transaction is committed for each executed batch. This is a recommended setting. Note that if this option is enabled, the flow forces the auto-commit even if it is disabled for the destination connection.Commit transaction when the transaction is committed in the source
: if this option is enabled, the flow automatically commits a transaction in the destination database when the transaction is committed in the source database. This option is ignored if the flow or the destination connection is configured with auto-commit.Commit interval during snapshot (ms)
: the commit interval during the snapshot. Setting the commit interval prevents the situation when the flow performs the snapshot of the large table, and the data is not committed to the destination database for an extended period of time. This option is ignored if the flow or the destination connection is configured with auto-commit.Alter target table if the source has columns that the target table doesn't have
: if this option is enabled, the flow will automatically create and executeALTER TABLE ADD COLUMN
SQL statement for each column that exists in the source and does not exist in the destination.Add Primary Key
: If this option is enabled, the flow will add a Primary Key for the destination table using column(s) that uniquely identify the record (Lookup Fields). This option is ignored if the Primary Key already exists in the destination table. We recommend enabling this option when the destination database is PostgreSQL or MySQL, and the Action is MERGE (UPSERT). The Primary Key or the unique index is required when executing a MERGE in MySQL or PostgreSQL.
Step 8. Optionally add script which will be executed after processing each row.
It is possible to add a JavaScript code which will be executed after processing each row.
The row considered to be processed when either SQL statement (INSERT, UPDATE, MERGE or DELETE) is executed on a destination table or (if Batch Window
> 1) when the SQL statement is added to the batch.
Typical use cases
- Trigger: execute SQL statement on other table or tables when row is added/updated/deleted in a destination table. Read how to execute any SQL from JavaScript.
- Per-row logging. Read about logging.
Add script
To add a script go to transformation->Configure->Additional transformations
. Add script in For Each Row
field.
Objects available in the script
The following objects can be referenced by name from JavaScript code:
Object name | Class name / JavaDoc | Package |
---|---|---|
dataSet | com.toolsverse.etl.common.DataSet | com.toolsverse.etl.common |
currentRow | com.toolsverse.etl.common.DataSetRecord | com.toolsverse.etl.common |
etlConfig | com.toolsverse.etl.core.config.EtlConfig | com.toolsverse.etl.core.config |
scenario | com.toolsverse.etl.core.engine.Scenario | com.toolsverse.etl.core.engine |
tableName | The name of the destination table | |
tableKey | The primary key columns in the destination table | |
connection | The destination JDBC connection | java.sql |
isSnapshotMode | True if the flow performs a snapshot of the source database | |
batchSize |
The size of the Batch Window set for the transformation. There is no batching if batchSize <= 1 | |
autoCommit | True if the transformation is configured to auto commit. The default value is true | |
op | The type of the action for that specific row: 'c' for INSERT, 'u' for UPDATE and 'd' for DELETE | |
destAlias | The destination com.toolsverse.etl.common.Alias | com.toolsverse.etl.common |
driver | The destination com.toolsverse.etl.driver.Driver | com.toolsverse.etl.driver |
Step 9. Schedule CDC flow.
We recommend using a continuous run Schedule type. The idea is that the Flow runs until it is stopped manually, there is an error, or (if configured) there are no more new CDC events for an extended period of time. It restarts automatically after a configurable number of seconds.
Monitor running CDC flow
Read how to monitor running CDC flow.
A pipeline with independent extract and load flows
Create and schedule Extract flow
CDC extract flow extracts data from a CDC-enabled database and creates CSV files with CDC events in the configured location. These files are loaded into the target database by Load Flow.
There is no need to create a separate Flow for the initial load. The first time it connects to a CDC-enabled source database, it reads a consistent snapshot of all of the included databases and tables. When that snapshot is complete, the Flow continuously reads the changes that were committed to the transaction log and generates the corresponding insert, update, and delete events.
Read more about CDC in Etlworks.
Step 1. Create a CDC Connection for the source database.
Read how to create a CDC connection.
Step 2. Create a file storage connection for CDC events.
This connection will be used for staging files with CDC events.
Read how to create a connection for CDC events.
The connection, by default, points to the directory {app.data}/debezium_data/events
.
Step 3. Create a connection for history and offset files.
Read how to create CDC Offset and History connection.
Etlworks CDC connectors store the history of DDL changes for the monitored database in the history file and the current position in the transaction log in the offset file.
Typical CDC extract flow starts by snapshotting the monitored tables (A) or starts from the oldest known position in the transaction (redo) log (B), then proceeds to stream changes in the source database (C). If the Flow is stopped and restarted, it resumes from the last recorded position in the transaction log. The connection created in this step can be used to reset the CDC pipeline and restart the process from scratch.
The connection, by default, points to the directory {app.data}/debezium_data
.
Step 4. Create CSV format.
This format is used to create CSV files with CDC events.
Read how to create CSV format.
Step 5. Create CDC extract flow.
In Flows click Add flow
. Type in cdc
in Select Flow type. Select Stream CDC events, create files
.
Left to right: select CDC connection created in step 1, select tables to monitor in FROM, select connection created in step 2 to stage files with CDC events, select format created in step 4.
You can now execute flow manually or schedule it to run continuously.
To stop CDC Flow manually, click Stop
/ Cancel
.
Note that as configured the CDC flow never stops automatically. It is a recommended configuration. You can configure the CDC Connection to stop when there are no more new CDC events for an extended period of time. Read more.
Step 6. Schedule CDC extract flow.
We recommend using a continuous run Schedule type. The idea is that the extract Flow runs until it is stopped manually, there is an error, or (if configured) there are no more new CDC events for an extended period of time. It restarts automatically after a configurable number of seconds.
Monitor running CDC extract flow
Read how to monitor running CDC extract flow.
Create and schedule Load flow
This Flow is used to load files created by the CDC extract flow into the target relational database.
Read more about CDC load flow.
Step 1. Create a new connection for the target database.
Read how to create a database connection.
Step 2. Create a Flow to load data in target database
Start creating a Flow by opening theFlows
window, clicking +
, and typing file to database
into the search field:
Step 3. Configure ETL transfromation.
Select or enter the following attributes of the transformation (left to right):
1. Server Storage Connection for CDC events created in this step.
2. CSV Format created in this step.
3. A wildcard filename that matches the names of the files created by the CDC extract flow: *_cdc_stream_*.csv
.
Note: If the source connection supports default wildcard templates (parameter Contains CDC events
is enabled or the source connection is created using CDC Events connector) the wildcard filename can be selected in FROM.
4. Destination databases connection creates in step 1.
5. The wildcard destination table name in the fully qualified format:db.schema.*
.
Step 4. Configure MERGE into the target table.
By default load flow INSERTS data into the destination table. To configure MERGE:
1. Click the MAPPING
button.
2. Set Action
to Record with MERGE
, enable Predict lookup fields
.
If you want to optimize the performance of the MERGE and are willing to skip the DELETE operations enable Do not execute DELETE when Action is Record with MERGE
. When this option is enabled the flow combines INSERT and UPDATE operarions into the single SQL call and bundles multiple MERGE calls together using the JDBC batch interface. It ignores DELETE events. When this option is disabled the flow executes each INSERT, UPDATE and DELETE statement individually which has a significant impact on performance.
Some databases, specifically PostgreSQL and MySQL require tables to have primary keys or unique indexes to perform MERGE operations.
If you want to MERGE data into the target tables in PostgreSQL or MySQL and tables do not exist yet you can set Action
to Record
, instead of Record with MERGE
. Note that is slower than Record with MERGE
.
Step 5. Configure flow to automatically handle source schema changes.
The load flow always creates a table in the target database if it does not exist.
Enable Alter target table if the source has columns that the target table doesn't have
to automatically add missing columns to the target table.
You can now execute the Load flow manually.
Step 7. Schedule load flow.
Schedule flow to run as often as needed. These are the options:
- Run flow periodically (as often as once a minute).
- Run flow continuously (as often as once a second).
Comments
0 comments
Please sign in to leave a comment.