About trigger-based change replication
Trigger-based change replication can be implemented in a lot of ways, but the basic idea is that each table that participates in a change replication as a source has triggers for INSERT
, UPDATE
, and optionally, DELETE
.
The triggers update the shadow table (or tables). The shadow tables may store the entire row to keep track of every single column change, or only the primary key is stored as well as the operation type (INSERT
, UPDATE
, or DELETE
).
Etlworks supports trigger-based change replication using a feature called conditional SQL action. Read about other change replication techniques available in Etlworks.
How it works
Let's assume that we want to replicate changes in the table Audit Trail
in a local Postgres database to the MySQL database in the cloud.
Let's also assume that table Audit Trail
has triggers for INSERT
, UPDATE
, and DELETE
.
When any of these triggers fire, the record is getting inserted into the cdc_event
table in the source Postgres database. The table might have the following structure:
create table cdc_event (
id bigint not null, /* a unique, automatically incremented number */
table_name varchar(1000) not null,
pk varchar(1000) not null, /* the value of the unique key which can be used
to identify the the record */
action char(1) not null, /* 'i' for insert, 'u' for update and 'd' for delete */
ts timestamp not null /* the timestamp of the event */
)
To implement change replication, we will be creating a Flow where the source and the destination are relational databases.
For each pair of the source-to-destination
tables, we will be using two transformations:
- For
INSERT
andUPDATE
events - For
DELETE
events
The key here is to define a Source query that will be polling records from the source table (Audit Trail
in our case) based on the records in the shadow table (cdc_event
in our case).
INSERT and UPDATE events
select audit_trail.*, cdc_event.action as cdc_event_action
from audit_trail
join cdc_event on cdc_event.pk = audit_trail.audit_trail_id
and cdc_event.table_name = 'audit_trail'
and cdc_event.action in ('i', 'u')
order by cdc_event.ts
This query will be driving the transformation for INSERT
and UPDATE
events.
To actually execute INSERT
or UPDATE
in the destination database, the SQL action in the transformation must be set to Conditional
and the Action Condition
must define the SQL action (INSERT
, UPDATE
, or MERGE
) based on the value of the field cdc_event_action
.
DELETE events
select pk from cdc_event where and cdc_event.table_name = 'audit_trail'
and cdc_event.action = 'd'
This query will be driving the transformation for the DELETE
events.
For DELETE
events, the SQL Action must be set to DELETE
.
Create change replication Flow
Step 1. Add triggers to populate shadow table(s).
Step 2. Create a source database Connection.
Step 3. Create a destination database Connection. Disable the Auto Commit
for the destination Connection.
Step 4. Start creating a Flow by selecting Database to database
in the Gallery.
Step 5. Add new source to destination transformation
Step 6. Click MAPPING
and define the Source query
as suggested here. Please note that this is just an example. The actual driving query for the change replication can (and will) be different. We will need to make one extra change to the Source query:
select audit_trail.*, cdc_event.action as cdc_event_action
from audit_trail
join cdc_event on cdc_event.pk = audit_trail.audit_trail_id
and cdc_event.table_name = 'audit_trail'
and cdc_event.action in ('i', 'u') and cdc_event.id <= {CDC_EVENT_ID_AUDIT_TRAIL}
order by cdc_event.ts
Notice the highlighted part of the query. Since we are tracking changes in the source using a dedicated table, we will need to delete processed records from this table after the Flow has been successfully executed. To do so, we will need to calculate the current maximum value of the ID for the specific table in the shadow table to use it later in the SELECT
and DELETE
queries. We will leave it here for now and will explain how later.
Step 7. Select the Parameters
tab and set:
Action
toConditional
Lookup fields
toaudit_trail_id
Action Conditions
to (example!)
dataSet.getFieldValue(currentRow, 'cdc_event_action').equals('i') ? 'insert' : update;
Alternatively, if the target database supports MERGE
, you can just set Action
to merge
:
'merge';
Step 8. This step is optional and only needed if you wish to track DELETES
.
8.1 Add another transformation for the same pair of source and destination tables. Specify the Source query
, as suggested here. Please note that this is just an example. The actual driving query for the change replication can (and will) be different.
Once again, we will need to make one extra change to the source query (same reason as before):
select pk from cdc_event where and cdc_event.table_name = 'audit_trail'
and cdc_event.action = 'd' and cdc_event.id <= {CDC_EVENT_ID_AUDIT_TRAIL}
8.2 Select the Parameters
tab and set:
Action
toDelete
Lookup fields
toaudit_trail_id
Step 9. Now that we have a Flow that can track changes in the source table and update the destination table, we will need another Flow to remove processed records from the shadow table.
9.1 Create a new SQL Flow.
9.2 Select the source Connection created in step 1 as a Connection for this Flow.
9.3 Enter the following SQL in the Parameters
tab:
delete from cdc_event where and cdc_event.table_name = 'audit_trail'
and cdc_event.id <= {CDC_EVENT_ID_AUDIT_TRAIL}
Notice the same extra and
in the query.
9.4 Make sure that Auto Commit
in the Parameters
the tab is unselected.
Step 10. Put everything together.
10.1 Create a new nested Flow.
10.2 Add the Flow created in steps 4 to 7 as step 1 of the workflow. Click the pen
button and copy the following JavaScript into the Condition
field:
var javaImports = new JavaImporter(com.toolsverse.etl.core.engine,
com.toolsverse.etl.common, com.toolsverse.util); with (javaImports) {
// get max id in cdc_event table for the audit_trail table
var maxIdForAuditTrail = Extractor.lookup(etlConfig, scenario,
"source connection name",
"id", "SELECT MAX(id) FROM cdc_event where table_name = 'audit_trail'");
var variable = new Variable();
variable.setName('CDC_EVENT_ID_AUDIT_TRAIL');
// set the value of the variable by converting returned long to string
variable.setValue(Utils.makeString(maxIdForAuditTrail));
// add varibale to the scenario
scenario.addVariable(variable);
// need it to conitnue executing the flow
value = true;
}
As you can see, in this JavaScript, we are getting the maximum value of the field id
in the cdc_event
table before the Flow has been executed and setting a value of the Flow variable CDC_EVENT_ID_AUDIT_TRAIL
.
10.3 Add the Flow created in step 8 as a step 2 of the workflow.
Comments
0 comments
Please sign in to leave a comment.