Overview
In Etlworks, it is possible to create a Flow that tracks changes in a data source (for example, database or API) and loads the changed records into the destination (for example, another database or API).
Etlworks supports unidirectional change replication: from the source to the destination. To replicate changes in the opposite direction (from the destination to the source), a separate Flow must be created, where the source and the destination swap places.
This technique uses a High Watermark Field to identify recently changed records.
Read about other change replication techniques available in Etlworks.
What you need to know
How HWM change replication works When a high watermark field is set, Etlworks calculates a maximum value for the field (high watermark) for the last successful load. |
How to configure HWM change replication Follow these steps to configure source-to-destination transformation with HWM change replication |
Calculating high watermark field value You can calculate high watermark value using SQL and Javascript. |
Ignore exception when executing HWM query There is an edge case when the system is trying to execute |
Using previous high watermark value in source query and high watermark query The high watermark value for the previous run is always stored in the metadata storage. It can be used in the source query and high watermark query to calculate the high watermark value. |
Manually setting the value of the high watermark field If the destination table is not empty, or it is possible that the records in the destination are getting updated by a different process (in addition to the change replication flow) it makes sense to manually set the value of the high watermark field. |
HWM replication when the source is configured with a wildcard Etlworks supports HWM replication when the source is configured with a wildcard.
|
Incremental synchronization between any source and any destination If your source is not a database - you can still set up a change replication flow that will load only the records that have changed since the last successful load. |
Use Cases
Change replication can occur:
-
Between two databases: a Flow pulls changed records from the source database to update the destination database.
-
Between database and any destination: a Flow pulls changed records from the source database and loads changes into any destination, for example, creates
update
files in Amazon S3. -
Between any source and any destination: a Flow pulls changed records from any source, for example, web service, and loads them into any supported destination, for example, data warehouse.
Resolve change conflicts
When the same record is modified in both the source and destination, Etlworks will simply apply the source changes to the destination.
High Watermark Field
A high watermark is the highest peak in value that a field has reached.
Which fields could become a High Watermark Field?
Basically, any field can be a High Watermark Field
as long as it has the data type TIMESTAMP
, DATE
, or NUMERIC
and can be used to uniquely identify recent changes.
For a database, it is recommended that there is an index for the High Watermark Field
.
Is it possible to set two or more columns as a high watermark for a single transformation?
There can only be one column in High Watermark Field
. If the source is an SQL database, you can override the Source query
to combine multiple columns into a single HWM field. For example, if there are columns created_at
and updated_at
you can use SQL like the below:
select *, nvl(updated_at, created_at) as hwm_field
from table
Then set High Watermark Field
to hwm_field
.
Example of the High Watermark Field
Suppose there is a table audit_trail
in a source PostgreSQL database. This table will be updated each time someone logs into the system or executes a specific function. We would like to track changes in this table and load them into an online data warehouse (which is also a database).
Suppose that the table audit_trail
has a field Last_modified TIMESTAMP NOT NULL
which is updated each time a record is inserted or updated. We would use Last_modified
as the High Watermark Field
so we can track changes in the audit_trail
table.
Use a fully qualified field name for the High Watermark Field
If a database is a source for the change replication, it is possible to use a fully qualified field name as the value for the High Watermark Field
, as in the example below:
public.audit_trail.partition_date
It makes sense to enter a fully qualified field name if the Source query
, which drives the change replication, includes JOINs, sub-selects, and table aliases so that field name collisions can be avoided.
How HWM change replication works
-
Etlworks always tracks what records have been extracted and what records have been loaded during a typical ETL.
-
It is possible to set up a
High Watermark Field
for that particular transformation. -
When a
High Watermark Field
is set, Etlworks calculates a maximum value for the field (high watermark) for the last successful load. -
Etlworks stores this value with the metadata storage.
-
The next time the ETL process is executed, Etlworks uses the previously stored high watermark value to filter out records that are older than the previous high watermark.
-
This technique works equally well for databases, files, and APIs.
-
If the change replication is from a database, Etlworks modifies the
WHERE
clause on the fly, to select records that are newer than the high watermarkselect * from audit_trail where last_modified > ?
where?
is a high watermark value. -
If the change replication is from an API or file, Etlworks automatically adds a JavaScript filter that skips all records that are older than the high watermark.
How to configure HWM change replication
Step 1. Create ETL flow and add source to destination transformation.
Step 2. Click CONFIGURE
, select Change Replication
tab and enter High Watermark Field.
Step 3. Optionally (recommended) add a query to calculate the value of the high watermark field using SQL on the fly.
Step 4. If HWM query is set (step 3) and you are loading data into the table which does not exist yet configure the transformation to ignore the error when executing HWM query.
Calculate high watermark field value
Calculate high watermark field value using SQL
This technique works only if the destination is a database.
Etlworks automatically tracks the changes in the source and replicates them in the destination. While doing this, it automatically calculates the highest peak in value of the High Watermark Field
and uses it to pull only the changed records from the source. It all works perfectly as long the destination table is updated only by running that specific change replication Flow. Unfortunately, that is not always the case.
To make a change replication Flow bulletproof, you can specify an SQL query that will be used to calculate the High Watermark Field Value
on the fly based on the current state of the destination table. Basically, it will be pulling only the records in the source that do not exist in the destination or have changed in the source since the last run of the Flow.
HWM query
Just enter an SQL query that returns the current maximum field value in the High Watermark Field Value
:
In this example, the High Watermark Field Value
is calculated using the SQL query below:
SELECT max(audit_trail_id) FROM audit_trail_updates
Instead of the actual table name of the FROM
clause, you can use the token {TABLE}
. It is specifically useful when extracting and loading data from multiple database objects using a wildcard source name.
SELECT max(id) FROM {TABLE}
The query is executed on the destination Connection.
Ignore exceptions when executing HWM query
The default behavior when executing an HWM query
is to throw an exception if the query is executed with an error.
There is an edge case when the system is trying to execute HWM query
against a table that does not exist yet. For example, if you configured a transformation to automatically create a destination table and the table doesn't exist yet at the time when the HWM query
is executed.
To handle the edge case, use the field High Watermark Exception to Ignore
. You can enter a string, for example, object doesn't exist
which, if it is found in the exception, will set the HWM
value to null instead of throwing an exception.
Calculate high watermark field value using JavaScript
Using this technique, you can change the HWM
value on the fly by providing a JavaScript program that returns a new HWM
value.
You can enter the program in the Change High Watermark Field Value
field located under MAPPING
> Change Replication
tab.
Example
highWatermark == null ? 0 : java.lang.Math.round(highWatermark)
The code above sets the initial HWM
value to 0
and then returns the long value of the highWatermark
(internally, it is stored as a decimal number).
The last evaluated line in the code returns the new HWM
value.
Available variables
The following variables can be referenced by name from JavaScript code:
Name | Class name / JavaDoc | Packages |
---|---|---|
dataSet | com.toolsverse.etl.common.DataSet | com.toolsverse.etl.common |
etlConfig | com.toolsverse.etl.core.config.EtlConfig | com.toolsverse.etl.core.config |
scenario | com.toolsverse.etl.core.engine.Scenario | com.toolsverse.etl.core.engine |
source | com.toolsverse.etl.core.engine.Source | com.toolsverse.etl.core.engine |
destination | com.toolsverse.etl.core.engine.Destination | com.toolsverse.etl.core.engine |
Use previous high watermark value in source query and high watermark query
The High Watermark Value
for the previous run is always stored in the metadata storage. It can be used in the Source query
and query to calculate the high watermark value (HWM query
).
Reference HWM value in the query
Use the following notation to reference the HWM
value in the query:
{TRANSFORMATION_NAME_HIGH_WATERMARK}
Read how to set the transformation name.
Work with NULL HWM value
To protect the query against the NULL HWM
value, use the following trick:
{token} is null or field > {token}
If the value of the HWM
field represented by {token}
is indeed NULL
, the query will be automatically transformed to:
NULL is null or field > NULL
Use the previous HWM value in the Source query
You can add the following string anywhere in the Source query
:
{TRANSFORMATION_NAME_HIGH_WATERMARK} is null OR
hwm_field > {TRANSFORMATION_NAME_HIGH_WATERMARK}
For example, when:
- The
Source query
isselect * from audit
- The
Transformation Name
isAUDIT
- The
HWM
field isaudit_id
Then, the Source query
will be:
select * from audit
WHERE {AUDIT_HIGH_WATERMARK} is null OR audit_id > {AUDIT_HIGH_WATERMARK}
Use the previous HWM value in the HWM query
Anywhere in the HWM query
, you can add the following string:
{TRANSFORMATION_NAME_HIGH_WATERMARK} is null OR
hwm_field > {TRANSFORMATION_NAME_HIGH_WATERMARK}
For example, when:
- The
HWM query
isselect max(audit_id) from audit_updates
- The
Transformation Name
isAUDIT
- The
HWM
field isaudit_id
Then, the HWM query
will be:
select max(audit_id) from audit_updates
WHERE {AUDIT_HIGH_WATERMARK} is null OR audit_id > {AUDIT_HIGH_WATERMARK}
Manually set the value of the high watermark field
If the destination table is not empty, or it is possible that the records in the destination are getting updated by the different processes (in addition to the change replication Flow), it makes sense to manually set the value of the High Watermark Field
.
There are two fields that can be used for that purpose:
High Watermark Field Value on First Run
: the optional value of theHigh Watermark Field
when running the change replication scenario for the first time. If theHigh Watermark Field
isTIMESTAMP
, use theYYYY-MM-DD HH:mm:SS.mil
Format, for example,2018-08-01 19:38:31.377
. The value set for this field will be used only when the change replication Flow is running the first time and will be ignored on the next run.High Watermark Field Value
: the optional value of theHigh Watermark Field
when running change replication scenario. If theHigh Watermark Field
isTIMESTAMP
, use theYYYY-MM-DD HH:mm:SS.mil
Format, for example,2018-08-01 19:38:31.377
. This value overrides the value set automatically when running the scenario and the value used for the first run. You can also enter an SQL query that returns the current maximum value of theHigh Watermark Field
in the destination table. For example,select max(last_updated) from order
. Read more about calculating high watermark field value.
HWM replication when the source is configured with a wildcard
Etlworks supports HWM replication when the source is configured with a wildcard. First, the Flow lists all the objects whose names match the wildcard, it then creates a separate source-to-destination transformation for each matching source object: database table file, etc. It applies the same HWM rules to each source-to-destination pair. Here's how you can do this:
Step 1. Add a source-to-destination transformation when the source is configured with a wildcard.
Step 2. Click MAPPING
and select the Parameters
tab. Enter {source}
in the Transformation Name
field. The token {source}
enables a mechanism that sets Transformation Name
to the fully qualified source table name. Without it, the Transformation Name
will be generated dynamically, which will have a negative impact on how the Flow locates the stored HWM
value. The other available token is {destination}
. You can use both tokens together: {source} to {destination}
. We recommend {source}
.
Step 3. Set the value of the High Watermark Field
under the Change Replication
tab.
It is expected that all source tables have the same High Watermark Field
, for example, last_update
.
Step 4. Optionally, set the SQL to calculate the High Watermark Field value on each run. Use token {table}
for the destination table name.
Setting high watermark value on success only
By default, the flows saves metrics to a file whenever a flow is executed. This metrics file is crucial for High-Water Mark (HWM) replication, as it stores the latest HWM value.
However, there is an edge case to consider: when HWM replication is part of a nested flow with many steps, and the developer wants to save the HWM value only if all steps are executed successfully.
Enable option Save metrics only on success
under Parameters
tab in the nested flow to ensure that metrics are saved to the file only if all flows in the nested flow execute successfully.
Incremental synchronization between any source and any destination
If your source is not a database, you can still set up a change replication Flow, which will load only the records that have changed since the last successful load.
Basically, all you need to do is to set the High Watermark Field
and enable the change replication for the transformation.
Make sure that the High Watermark Field
is either TIMESTAMP
/ DATE
(or can be converted to TIMESTAMP
/ DATE
), or NUMERIC
.
Troubleshooting
I didn't do anything but HWM is reset
If any of the below occurs the flow will reset the HWM value:
- The name of the source or destination has changed.
- The order of source-to-destination transformations in the Flow has changed.
- One or multiple source-to-destination transformations in the Flow were disabled or enabled.
Reason: Etlworks uses the name of the source-to-destination transformation as a key to store the HWM value for the last successful run. The name is calculated as SOUCRE to DESTINATION ind
, where the ind
is the index of the transformation in the Flow. Example: ACC to ACCOUNT 2
.
Solution for non-wildcard transformations: explicitly set the unique name of the transformation.
Solution for wildcard transformations: set the name of the transformation to {source}
or {destination}
.
Both of the techniques discussed above will reset the HWM for all affected transformations if you make a change after HWMs have been already captured and stored.
The best solution (if available): calculate the HWM on the fly.
I configured a query to calculate HWM on the fly, flow fails in the first run
Reason: if you calculate the HWM value on the fly using a query executed in the destination database it is assumed that the destination table already exists. If the table does not exist the flow will fail.
Solution: configure the transformation to ignore the error when executing the HWM query.
Comments
0 comments
Please sign in to leave a comment.