Overview
A manufacturing company needs to incrementally load data from the OLTP database in MS SQL Server into the data warehouse in Amazon Aurora MySQL.
Requirements
- The ETL process must be able to capture all the changes in the OLTP database (including deletes) and replicate them in the data warehouse.
- In addition to changing replication, there should be a way to do a full load of any table at any time.
- Some of the dimensions in the data warehouse will be populated by running an SQL query, so the ETL process must be able to execute a custom SQL after loading the data.
- While most of the tables are relatively small (< 10K records), there are a few large ones, so the ETL process should be able to handle tables with millions of rows.
- The ETL process will be running at least a few times a day, potentially [almost] in real-time.
Prerequisites
To incrementally load the data from the source into the destination, we will be using a feature called Change Replication
. In order to work, the change replication requires each source table to have a so-called High Watermark Field
. In our case, all source tables have a column called timestamp
, which is a binary-encoded timestamp of when the record was last modified or inserted. We will be using this column as a High Watermark Field
.
In order to capture and replicate the deletes (when the record is deleted in the source, it must also be deleted in the destination), the ON_DELETE
triggers were added to each source table. They insert a primary key of the deleted record, as well as the table name in the [delete audit]
table. Generally speaking, the better approach would be to use soft delete in the source, when the records are not actually getting deleted and the deleted_timestamp
or is_deleted_
flag is getting set instead. This would allow us to skip the step when we need to capture the deletes. Unfortunately, it wasn't possible in this particular case due to the business requirements.
Solution
Create a Flow to incrementally load records from the source into the destination.
Step 1. Create a database Connection for the OLTP database (MS SQL Server). This Connection will be used as a source for most of the transformations.
Step 2. Create a database Connection for the data warehouse (Amazon Aurora MySQL). This Connection will be used as a destination for all transformations.
Step 3. Create a new Flow by selecting Flow type Database to database
in the gallery.
Step 4. Continue by adding source-to-destination transformations where the source is a Connection created in Step 1, and the destination is a Connection created in Step 2.
Step 5. When configuring the transformation, set the following parameters:
-
From
: the source table name. -
To
: the destination table name (if needed, use a fully qualified notation). -
Source query
: the select statement used to extract data from the source table. This field is optional, so if nothing is entered, Etlwork will execute the queryselect * from source
. Read more aboutSource query
.
Step 6. Configure the change replication parameters in the Change Replication tab.
-
High Watermark
: typically, it is enough to just enter a fully qualified name of the field in the sources table, which is used (field) as a high watermark. In this particular case, the source SQL Server table has a field calledtimestamp
, which is a binary-encoded timestamp of when the record was modified or inserted. It needs to be decoded to thebigint
, hence theCONVERT(bigint, [timestamp])
. -
High Watermark Field Value
: Etlworks can automatically track the changes in the source and calculate the value of theHigh Watermark Field
. It is recommended to use an SQL statement to calculate the value on the fly. In this case, if any change in the destination table happened outside of Etlworks, the system will still be able to use the correct value of theHigh Watermark Field
. The queryselect max(high_water_mark_no) from customer
, is executed on the destination Connection and calculates the value of theHigh Watermark Field
as a max value of the fieldhigh_water_mark_no
in thecustomer
table. -
Step 7. Configure a transformation to execute SQL MERGE
(the default is INSERT
) in the destination table for each record extracted by running the Source query
. When MERGE
is configured, the system will INSERT
new records and UPDATE
existing records. To configure MERGE
, select the Parameters
tab and set the following properties:
-
Action
:MERGE
. -
Lookup Fields
:customer_id
(which is a primary key in the destination tablecustomer
). -
Transformation Name
: optionally, give the transformation a unique name. -
Step 8. Add all other source-to-destination pairs and save the Flow.
Create a Flow to delete records in the destination which have been deleted in the source
Step 1. Create a new Flow by selecting the Flow type Migrate from one database to another
in the gallery.
Step 2. Continue by adding source-to-destination transformations where the source and the destination use the same Connections as the previous Flow.
Step 3. When configuring the transformation, set the following parameters:
-
From
: the source table name. -
To
: the destination table name (if needed, use a fully qualified notation). -
Source query
: the select statement used to get the deleted ids from the[delete audit]
table. In this particular case, theSource query
looks like below:
-
SELECT [Document Type] Document_Type,
[Document No_] Document_No,
[Line No_] Line_No
FROM [dbo].[Delete Audit]
WHERE [Delete Date] >= dateadd(day,datediff(day,0,GETDATE()),0)
AND [Delete Date] <= dateadd(day,datediff(day,0,GETDATE()),0)
AND [Table Name] = 'Purchase Line'
AND [Deleted Indicator] = 0
Step 4. Configure a transformation to execute SQL DELETE
(the default is INSERT
) on the destination table for each record populated by the Source query
. To configure the DELETE
action, select the Parameters
tab and set the following properties:
-
Action
:DELETE
. -
Lookup Fields
: the fields to uniquely identify records to delete. In this particular case:Document_Type, Document_No, Line_No
. -
Transformation Name
: optionally, give the transformation a unique name. -
Step 5. Add all other source-to-destination pairs and save the Flow.
Step 6. Create an SQL Flow to clear the [delete audit]
table in the source. When creating the Flow, specify the following parameters:
- Connection: the source Connection from the previous two Flows.
-
SQL: SQL which updates the
[delete audit]
table. In this particular case: -
UPDATE [dbo].[Delete Audit] SET [Deleted Indicator] = 1 WHERE [Deleted Indicator] = 0 AND [Delete Date] >= dateadd(day,datediff(day,0,GETDATE()),0) AND [Delete Date] <= dateadd(day,datediff(day,0,GETDATE()),0);
Create a Flow which updates the dimension in the data warehouse by running SQL query
Step 1. Create an SQL Flow.
Step 2. In the Connection
field, select the destination Connection that was used in the Flows above.
Step 3. Enter the SQL statement to populate the dimension.
you can enter multiple ;
separated SQL statements, for example, TRUNCATE statement;INSERT statement;UPDATE statement;
. You can also enter an anonymous SQL block, for example, BEGIN code END;
Create a Flow which combines all flows above together.
Step 1. Create a nested Flow.
Step 2. Add the Flows (steps) to the nested Flow in the following order:
- The Flow which deletes records in the destination.
- The Flow which incrementally loads data from the source into the destination.
- The Flow which updates the
[delete audit]
table. - The Flow which updates the dimension by running the SQL statement.
-
Step 3. Save the Flow and schedule it to run twice a day.
Comments
0 comments
Please sign in to leave a comment.