- Starter
- Business
- Enterprise
- On-Premise
- Add-on
Overview
Etlworks includes several Flows optimized for extracting data from various sources, transforming, and loading it into relational databases.
Change Data Capture (CDC) pipeline allows extracting changes in real-time from the databases that support CDC and loading them into any supported relational database.
Etlworks includes a built-in, deeply integrated CDC engine based on a customized version of Debezium.
There is nothing to install or manage separately—the CDC engine runs as part of the flow execution.
Supported source databases
- MySQL
- SQL Server
- PostgreSQL
- Oracle
- DB2
- MongoDB
- AS400 (IBMI platfroms)
What destination databases are supported
We support any relational database so long as there is a JDBC driver for that database. Here is a current list of the supported databases. If you wish to CDC data into the database that is not on the list let us know by emailing support@etlworks.com and will add it as soon as possible.
When to use this pipeline
Use the pipeline described in this article to extract data from the CDC-enabled database and load it into relational databases in real time.
How it works
A CDC pipeline in Etlworks captures changes from a CDC-enabled source database and loads them into a supported relational database. There are three options for building this pipeline, depending on your performance needs, scale, and destination capabilities:
Option 1. Single Flow – Stream CDC Events Directly
Use this option if you prefer simplicity and are processing moderate volumes of change events.
A single flow extracts CDC events and streams them directly into the destination database in real time.
Best for: straightforward use cases, minimal setup, lower volumes.
Advantages: easy to configure, schedule, and monitor.
Option 2. Separate Extract and Load Flows
Use this option if you need higher performance and horizontal scalability.
-
The pipeline includes two independent flows:
Extract Flow: captures CDC events from the source.
Load Flow: streams those events into the destination.
Flows run in parallel, allowing faster throughput and lower latency.
You can scale further by adding more extract/load flow pairs across nodes.
Best for: high-volume use cases and distributed environments.
Option 3. CDC with Bulk Load
Use this option if the destination database supports bulk loading and you want maximum performance.
CDC events are staged and then loaded in batches using the destination’s bulk load mechanism.
Offers the fastest load performance, especially for large datasets.
Best for: databases that support bulk loading and scenarios where performance is critical.
Prerequisites
Enable CDC in the source database:
- Enable CDC for Microsoft SQL Server
- Enable CDC MySQL
- Enable CDC for Oracle
- Enable CDC for PostgreSQL
- Enable CDC for DB2
- Enable CDC for MongoDB
- Enable CDC for AS400 (IBMI platfroms)
A pipeline with a single flow
CDC flow streams CDC events directly into a relational database in real time.
Note: There is no need to create a separate Flow for the initial load. The first time it connects to a CDC-enabled source database, it reads a consistent snapshot of all of the included databases and tables. When that snapshot is complete, the Flow continuously reads the changes that were committed to the transaction log and generates the corresponding insert, update, and delete events.
Read more about CDC in Etlworks.
Step 1. Create a CDC Connection for the source database.
Read how to create a CDC connection.
Step 2. Create a Database Connection for the destination database.
Read how to create a database connection.
Step 3. Create a connection for history and offset files.
Read how to create CDC Offset and History connection.
Etlworks CDC connectors store the history of DDL changes for the monitored database in the history file and the current position in the transaction log in the offset file.
Typical CDC extract flow starts by snapshotting the monitored tables (A) or starts from the oldest known position in the transaction (redo) log (B), then proceeds to stream changes in the source database (C). If the Flow is stopped and restarted, it resumes from the last recorded position in the transaction log. The connection created in this step can be used to reset the CDC pipeline and restart the process from scratch.
The connection, by default, points to the directory{app.data}/debezium_data.
Step 5. Create CDC flow.
In Flows click Add flow. Type incdcin Select Flow type. SelectStream CDC events into relational database.
Step 6. Add source-to-destination transformation
Left to right:
- select the CDC connection created in step 1
- select tables to monitor in FROM
- select the database connection created in step 2
- select or enter the destination table name in TO. When streaming data from multiple source tables set the destination table using a wildcard template in the following format:database.schema.prefix_*_suffix, wheredatabase.schemais a database and schema in the relational database to load data into.
Step 7. Optionally configure mapping.
Click theMAPPINGbutton.
Use the mapping editor to configure the per-field mapping.
Here is what you can do in mapping:
- Disable the column.
- Change the column name.
- Change the column data type when creating a new table.
- Add a function in JavaScript to calculate the column value.
Note: You cannot add columns in mapping which do not exist in the CDC stream.
Read more about mapping.
Step 8. Optionally Configure load parameters
Select theParameterstab.
If needed, modify the following Load parameters:
- Action: the action be either INSERT or MERGE (default). If the action is set to MERGE the flow will INSERT records that do not exist in the destination table, UPDATE existing records, and DELETE records that were deleted in the source table.
- Lookup Fields: a comma-separated list of columns that uniquely identify the record in the destination table. MERGE action requires a list of columns that uniquely identify the record. By default, the flow will attempt to predict the Lookup Fields by checking unique indexes in the source and destination tables, but if there is no unique index in either table it is not guaranteed that the prediction will be 100% accurate. Use this parameter to define the Lookup Fields in the following format: fully.qualified.table1=field1,field2;fully.qualified.table2=field1,field2.
- Custom SQL: the value of this parameter is used to generate the custom SQL statement for updating data in the destination table. The developer can provide a template for SQL, which will be used at runtime to generate the actual SQL statement. If the value of this parameter is empty, the flow generates the default INSERT, MERGE and DELETE SQL statements using the dialect of SQL for the target database.
- Batch Window: this parameter is used to group SQL statements in batches, then execute each batch in one database call, which improves the performance of the load. The total number of statements in a batch cannot exceed the Batch Window. Setting this parameter to 1 disables batch processing.
- Force batch execution every (ms): if batch processing is enabled (Batch Window > 1), the flow will force the batch execution after the configured number of milliseconds, even if the current number of statements in the batch is less than the batch window. This parameter is used to prevent the situation when the number of updates for the specific table is so low that it constantly stays below the batch window, preventing pushing the updates to the destination database.
- Force auto-commit: when this option is enabled, the transaction is committed for each executed batch. This is a recommended setting. Note that if this option is enabled, the flow forces the auto-commit even if it is disabled for the destination connection.
- Commit transaction when the transaction is committed in the source: if this option is enabled, the flow automatically commits a transaction in the destination database when the transaction is committed in the source database. This option is ignored if the flow or the destination connection is configured with auto-commit.
- Commit interval during snapshot (ms): the commit interval during the snapshot. Setting the commit interval prevents the situation when the flow performs the snapshot of the large table, and the data is not committed to the destination database for an extended period of time. This option is ignored if the flow or the destination connection is configured with auto-commit.
- Alter target table if the source has columns that the target table doesn't have: if this option is enabled, the flow will automatically create and execute ALTER TABLE ADD COLUMN SQL statement for each column that exists in the source and does not exist in the destination.
- Add Primary Key: If this option is enabled, the flow will add a Primary Key for the destination table using column(s) that uniquely identify the record (Lookup Fields). This option is ignored if the Primary Key already exists in the destination table. We recommend enabling this option when the destination database is PostgreSQL or MySQL, and the Action is MERGE (UPSERT). The Primary Key or the unique index is required when executing a MERGE in MySQL or PostgreSQL.
Step 9. Optionally configure CUSTOM SQL
When the destination of a CDC Flow is a relational database, it is possible to implement complex change processing logic using Custom SQL.
Typical Use Cases for Custom SQL
Custom SQL in CDC flows is commonly used for:
- handling parent-child relationships
- applying complex update logic
- converting CDC operations into relational UPSERT logic
- writing data into multiple destination tables
This approach allows implementing sophisticated relational transformations while still benefiting from Etlworks CDC streaming and batching capabilities.
Configure Custom SQL
In this setup the SQL Action is configured as INSERT, while the actual processing logic is defined in the Custom SQL field of the transformation.
The SQL template can reference source columns using the same syntax as regular Custom SQL:
{column_name}At runtime these tokens are converted to bind variables and populated with values from the CDC event.
For CDC flows additional capabilities are available that allow handling events from multiple source tables and controlling the execution order.
SQL Blocks
Custom SQL can be divided into blocks that are executed only for specific source tables.
A block is defined using the following syntax:
-- @sql key=source_table
...
-- @endThe key identifies the source table whose CDC events should trigger execution of this SQL block.
This allows a single CDC Flow to process changes from multiple tables while executing different SQL logic for each table.
Block Dependencies
When CDC events involve multiple related tables, execution order can be controlled using dependencies between SQL blocks.
Dependencies are defined using the dependsOn parameter:
-- @sql key=child_table dependsOn=parent_table
...
-- @endThis ensures that the SQL block for child_table is executed only after the block for parent_table.
This is particularly useful when processing relational structures such as:
- parent / child tables
- master / detail relationships
- entities that must exist before dependent records are written
Custom SQL Example
The following example demonstrates a CDC pipeline that processes changes from multiple MongoDB collections and writes them into relational tables.
The files block processes file metadata.
The chunks block processes file chunks and depends on files.
The potentialUsecases block also depends on files.
-- @sql key=files
DECLARE @usecase_id INT
DECLARE @file_id BIGINT
DECLARE @metadata NVARCHAR(MAX)
SELECT @usecase_id = u.id
FROM poc.Usecase u
WHERE u.name = {metadata_usecase}
IF {debezium_cdc_op} = 'd'
BEGIN
UPDATE poc.BinaryFile
SET migratedDeleted = 1
WHERE migratedId = {mongo_id}
END
-- @end
-- @sql key=chunks dependsOn=files
DECLARE @usecase_id INT
DECLARE @file_id BIGINT
...
-- @end
-- @sql key=potentialUsecases dependsOn=files
DECLARE @usecase_id INT
DECLARE @file_id BIGINT
...
-- @endIn this example:
- {debezium_cdc_op} contains the CDC operation type (c, u, d)
- {mongo_id} and other tokens reference fields from the CDC event
- each SQL block is executed only for the corresponding source table
- dependencies ensure that parent records are processed before child records
Step 10. Optionally add script which will be executed after processing each row.
It is possible to add a JavaScript code which will be executed after processing each row.
The row considered to be processed when either SQL statement (INSERT, UPDATE, MERGE or DELETE) is executed on a destination table or (if Batch Window > 1) when the SQL statement is added to the batch.
Typical use cases for for-each-row script
- Trigger: execute SQL statement on other table or tables when row is added/updated/deleted in a destination table. Read how to execute any SQL from JavaScript.
- Per-row logging. Read about logging.
Add script
To add a script go to transformation->Configure->Additional transformations. Add script in For Each Row field.
Objects available in the script
The following objects can be referenced by name from JavaScript code:
| Object name | Class name / JavaDoc | Package |
|---|---|---|
| dataSet | com.toolsverse.etl.common.DataSet | com.toolsverse.etl.common |
| currentRow | com.toolsverse.etl.common.DataSetRecord | com.toolsverse.etl.common |
| etlConfig | com.toolsverse.etl.core.config.EtlConfig | com.toolsverse.etl.core.config |
| scenario | com.toolsverse.etl.core.engine.Scenario | com.toolsverse.etl.core.engine |
| tableName | The name of the destination table | |
| tableKey | The primary key columns in the destination table | |
| connection | The destination JDBC connection | java.sql |
| isSnapshotMode | True if the flow performs a snapshot of the source database | |
|
batchSize |
The size of the Batch Window set for the transformation. There is no batching if batchSize <= 1 | |
| autoCommit | True if the transformation is configured to auto commit. The default value is true | |
| op | The type of the action for that specific row: 'c' for INSERT, 'u' for UPDATE and 'd' for DELETE | |
| destAlias | The destination com.toolsverse.etl.common.Alias | com.toolsverse.etl.common |
| driver | The destination com.toolsverse.etl.driver.Driver | com.toolsverse.etl.driver |
Step 11. Schedule CDC flow.
We recommend using a continuous run Schedule type. The idea is that the Flow runs until it is stopped manually, there is an error, or (if configured) there are no more new CDC events for an extended period of time. It restarts automatically after a configurable number of seconds.
Monitor running CDC flow
Read how to monitor running CDC flow.
A pipeline with independent extract and load flows
CDC extract flow extracts data from a CDC-enabled database and creates CSV files with CDC events in the configured location. These files are loaded into the target database by Load Flow.
Step-by-Step Guide
Step 1. Create and schedule CDC Extract flow
This flow is used to extract CDC events from the source database in read time and create files with events.
Read how to create CDC extract flow.
Step 2. Create a Flow to load data in target database
Start creating a Flow by opening theFlowswindow, clicking +, and typing file to database into the search field:
Step 3. Add ETL transformation(s).
Select or enter the following attributes of the transformation (left to right):
1. Storage Connection and format for CDC events created in step 1.
2. A wildcard filename that matches the names of the files created by the CDC extract flow: *_cdc_stream_*.csv.
Note: If the source connection supports default wildcard templates (parameter Contains CDC events is enabled or the source connection is created using CDC Events connector) the wildcard filename can be selected in FROM.
3. Destination databases connection.
4. The wildcard destination table name in the fully qualified format:db.schema.*.
Step 4. Optionally configure MERGE into the target table.
By default load flow INSERTS data into the destination table. To configure MERGE:
1. Click the MAPPING button.
2. Set Action to Record with MERGE, enable Predict lookup fields.
If you want to optimize the performance of the MERGE and are willing to skip the DELETE operations enable Do not execute DELETE when Action is Record with MERGE. When this option is enabled the flow combines INSERT and UPDATE operarions into the single SQL call and bundles multiple MERGE calls together using the JDBC batch interface. It ignores DELETE events. When this option is disabled the flow executes each INSERT, UPDATE and DELETE statement individually which has a significant impact on performance.
Note: Some databases, specifically PostgreSQL and MySQL require tables to have primary keys or unique indexes to perform MERGE operations.
If you want to MERGE data into the target tables in PostgreSQL or MySQL and tables do not exist yet you can set Action to Record, instead of Record with MERGE. Note that is slower than Record with MERGE.
Step 5. Configure flow to automatically handle source schema changes.
The load flow always creates a table in the target database if it does not exist.
Enable Alter target table if the source has columns that the target table doesn't have to automatically add missing columns to the target table.
You can now execute the Load flow manually.
Step 6. Schedule load flow.
Schedule flow to run as often as needed. These are the options:
- Run flow periodically (as often as once a minute).
- Run flow continuously (as often as once a second).