About Hight Watermark Change Replication (HWM)
In Integrator, it is possible to create a flow that tracks changes in a data source, for example, database or API, and loads the changed records into the destination, for example, another database or API. Another name for change replication is data synchronization.
Integrator supports unidirectional change replication: from the source to the destination. To replicate changes in the opposite direction--from the destination to the source--a separate flow must be created, where the source and the destination swap places.
This technique uses a high watermark field to identify recently changed records.
Read about other change replication techniques available in Etlworks Integrator.
Use Cases
Change replication can occur:
-
Between two databases: a flow pulls changed records from the source database to update the destination database.
-
Between database and any destination: a flow pulls changed records from the source database and load changes into any destination, for example, creates "update" files in Amazon S3.
-
Between any source and any destination: a flow pulls changed records from any source, for example, web service, and loads them into any supported destination, for example, data warehouse.
Resolving change conflicts
When the same record is modified in both the source and destination, the Integrator will simply apply the source changes to the destination.
High watermark field
A high watermark is the highest peak in value that a field has reached.
Which fields could become a high watermark field?
Basically, any field can be a high watermark field as long it has the data type TIMESTAMP, DATE, or NUMERIC and can be used to uniquely identify recent changes.
For a database, it is recommended that there is an index for the high watermark field.
Is it possible to set two or more fields as a high watermark for a single transformation?
There can only be one high watermark field.
Example of the high watermark field
Suppose there is a table audit_trail
in a source PostgreSQL database. This table will be updated each time the user logs into the system or executes a specific function. We would like to track changes in this table and load them into an online data warehouse (which is also a database).
Suppose that the table audit_trail
has a field Last_modified TIMESTAMP NOT NULL
which is updated each time a record is inserted or updated.
We would use Last_modified
as the high watermark field, so we can track changes in the audit_trail
table.
Using a fully-qualified field name for high watermark field
If a database is a source for the change replication, it is possible to use a fully-qualified field name as the value for the high watermark field, as in the example below:
public.audit_trail.partition_date
It makes sense to enter a fully-qualified field name if the Source Query which drives the change replication includes JOINs, sub-selects, and table aliases so that field-name collisions can be avoided.
How HWM change replication works
-
Etlworks Integrator always tracks what records have been extracted and what records have been loaded during a typical ETL.
-
It is possible to setup a High watermark field for that particular transformation.
-
When a high watermark field is set, Integrator calculates a maximum value for the field (high watermark) for the last successful load.
-
Integrator stores this value with the metadata.
-
The next time the ETL process is executed, Integrator uses the previously calculated high watermark value to filter out records which are older than the previous high watermark.
-
This technique works equally well for databases, files and APIs.
-
If the change replication is from a database, Integrator modifies the
WHERE
clause on the fly, to select records which are newer than the high watermark:select * from audit_trail where last_modified > ?
-- where?
is a high watermark value. -
If the change replication is from an API or file, Integrator automatically adds a JavaScript filter which skips all records that are older than the high watermark.
Calculating high watermark field value
Calculating high watermark field value using SQL
This technique works only if the destination is a database.
Etlworks Integrator automatically tracks the changes in the source and replicates them in the destination. While doing this, it automatically calculates the highest peak in value of the high watermark field and uses it to pull only the changed records from the source. It all works perfectly as long the destination table is updated only by running that specific change replication flow. Unfortunately, that is not always the case.
To make a change replication flow bullet proof you can specify a SQL query which will be calculating the high watermark field value on the fly, based on the current state of the destination table. Basically, it will be pulling only the records in the source which do not exist in the destination or have changed in the source since the last run of the flow.
HWM query
Just enter a SQL query which returns the current maximum field value in the High Watermark Field value:
In this example the high watermark field value is calculated using the SQL query below:
SELECT max(audit_trail_id) FROM audit_trail_updates
Instead of actual table name if the FROM clause you can use token {TABLE}
. It is specifically useful when extracting and loading data from multiple database objects using a wildcard source name.
SELECT max(id) FROM {TABLE}
The query is executed on the destination connection.
Ignoring exceptions when executing HWM query
The default behavior when executing an HWM query is to throw an exception if the quire is executed with an error.
There is an edge case when the system is trying to execute an HWM query against a table that does not exist yet. For example, if you configured a transformation to automatically create a destination table and the table doesn't exist yet at the time when HWM query is executed.
To handle the edge case use the field High Watermark Exception to Ignore. You can enter a string (for example, object doesn't exist
) which, if it is found in the exception, will set the HWM value to null instead of throwing an exception.
Calculating high watermark field value using JavaScript
Using this technique you can change the HWM value on the fly by providing a JavaScript program that returns a new HWM value. The technique is typically used together with the Change Data Tracking.
You can enter the program in the Change High Watermark Field Value field, located under MAPPING->Change Replication tab.
Example
highWatermark == null ? 0 : java.lang.Math.round(highWatermark)
The code above sets the initial HWM value to 0 and then returns the Long value of the highWatermark (internally it is stored as a decimal number).
The last evaluated line in the code returns the new HWM value.
Available objects
The following objects can be referenced by name from JavaScript code:
Object name | Class name / JavaDoc | Import |
---|---|---|
dataSet | com.toolsverse.etl.common.DataSet | importPackage(com.toolsverse.etl.common); |
etlConfig | com.toolsverse.etl.core.config.EtlConfig | importPackage(com.toolsverse.etl.core.config); |
scenario | com.toolsverse.etl.core.engine.Scenario | importPackage(com.toolsverse.etl.core.engine); |
source | com.toolsverse.etl.core.engine.Source | importPackage(com.toolsverse.etl.core.engine); |
destination | com.toolsverse.etl.core.engine.Destination | importPackage(com.toolsverse.etl.core.engine); |
Using previous high watermark value in source query and high watermark query
The high watermark value for the previous run is always stored in the metadata storage. In can be used in the source query and query to calculate the high watermark value (hwm query).
Referencing HWM value in the query
Use the following notation to reference the HWM value in the query:
{TRANSFORMATION_NAME_HIGH_WATERMARK}
Read how to set the transformation name.
Working with NULL HWM value
To protect the query against the NULL HWM value use the following trick:
{token} is null or field > {token}
If the value of the HWM field represented by {token} is indeed NULL, the query will be automatically transformed to
NULL is null or field > NULL
Using previous HWM value is the Source query
Anywhere in the source query you can add the following string:
{TRANSFORMATION_NAME_HIGH_WATERMARK} is null OR
hwm_field > {TRANSFORMATION_NAME_HIGH_WATERMARK}
For example, when
- The source query is select * from audit
- The transformation name is AUDIT
- The hwm field is audit_id
then, the source query will be:
select * from audit
WHERE {AUDIT_HIGH_WATERMARK} is null OR audit_id > {AUDIT_HIGH_WATERMARK}
Using previous HWM value in the HWM query
Anywhere in the hwm query you can add the following string:
{TRANSFORMATION_NAME_HIGH_WATERMARK} is null OR
hwm_field > {TRANSFORMATION_NAME_HIGH_WATERMARK}
For example, when
- The hwm query is select max(audit_id) from audit_updates
- The transformation name is AUDIT
- The hwm field is audit_id
then, the hwm query will be:
select max(audit_id) from audit_updates
WHERE {AUDIT_HIGH_WATERMARK} is null OR audit_id > {AUDIT_HIGH_WATERMARK}
Manually setting the value of the high watermark field
If the destination table is not empty, or it is possible that the records in the destination are getting updated by the different process (in addition to the change replication flow) it makes sense to manually set the value of the high watermark field.
There are two fields which can be used for that purpose:
- High Watermark Field Value on First Run - the optional value of the high watermark field when running change replication scenario first time. If the high watermark field is a TIMESTAMP use
YYYY-MM-DD HH:mm:SS.mil
format, for example2018-08-01 19:38:31.377
. The value set for this field will be used only when the change replication flow is running the first time and will be ignored on the next run. - High Watermark Field Value - the optional value of the high watermark field when running change replication scenario. If the high watermark field is a TIMESTAMP use
YYYY-MM-DD HH:mm:SS.mil
format, for example2018-08-01 19:38:31.377
. This value overrides the value set automatically when running scenario and the value used for the first run. You can also enter a SQL query which returns the current maximum value of the high watermark field in the destination table. For example,select max(last_updated) from order
. Read more about calculating high watermark field value.
Incremental synchronization between any source and any destination
If your source is not a database - you can still set up a change replication flow which will load only the records that have changed since the last successful load.
Basically, all you need to do is to set the High watermark field and enable the change replication for the transformation.
make sure that the high watermark field is: either TIMESTAMP/DATE (or can be converted to TIMESTAMP/DATE), or NUMERIC.
Comments
0 comments
Please sign in to leave a comment.