- Startup
- Business
- Enterprise
- On-Premise
- Add-on
Overview
Change data capture (CDC) is an approach to data integration that is based on the identification, capture, and delivery of the changes made to the source database and stored in the database redo log (also called transaction log).
The changes are captured without making application-level changes and without having to scan transactional tables. CDC is a perfect solution for non-intrusive, low-impact real-time data integration.
The Etlworks Integrator supports native log-based change data capture for PostgreSQL, SQL Server, MySQL, Oracle, DB2, and MongoDB.
We are using a heavily modified embedded Debezium engine for CDC.
Read about other change replication techniques available in the Etlworks Integrator.
Supported CDC topologies
Cloud, with optional SSH tunnel
Read more about using SSH tunnel to connect to the on-premise database.
Hybrid-cloud with data integration agents
Read more about data integration agents.
On-premise
Read more about deployment options.
Enable change data capture for the source database
Enabling CDC is different for each database. Please use the following tutorials:
CDC connectors
Read about available CDC connectors.
CDC pipeline
The end-to-end CDC pipeline extracts data from a source CDC-enabled database and loads it into any supported destination:
- another relational database
- cloud data-warehouses such as Snowflake, Redshift, or Azure Synapse
- a web service
- a data lake
- a file storage system
In the Etlworks Integrator the end-to-end pipeline typically includes 2 Flows:
- Extract Flow: this Flow extracts data from a CDC-enabled database.
- Load Flow: this Flow loads data into any supported destination.
The extract and load Flows are running in parallel which guarantees a very high processing speed and low latency. In fact, the actual pipeline can include multiple independent extract and load Flows which allows it to scale horizontally between multiple processing nodes.
The Extract
This Flow extracts data from a CDC-enabled database.
There are 2 types of extract CDC Flows in the Etlworks Integrator:
Stream CDC events, create files
In this scenario, the CDC events (INSERTS
, UPDATES
, and DELETES
) are extracted from the transaction log of the source database and stored as CSV and/or JSON files in the local or cloud storage.
This Flow type does not require any additional elements of the infrastructure, such as the message queue.
The separate load Flow reads the files, then transforms and loads data into the destination. The load Flow deletes the processed files.
There is no need to create a separate Flow for the initial load. The first time it connects to a CDC-enabled source database, it reads a consistent snapshot of all of the included databases and tables. When that snapshot is complete, the Flow continuously reads the changes that were committed to the transaction log and generates the corresponding insert, update, and delete events.
Create CDC connection and Flow
Step 1. Create a CDC Connection for the source database. Configure the following parameters:
- Enable
Serialize CDC events as CSV files
: If you wish to create JSON files, you can also enableSerialize CDC events as JSON files
. These options can be enabled or disabled independently from each other. - Include
Database(s)
: the comma-separated list of databases(s) to poll CDC events from. - Include
Table(s)
: the command-separated list of fully qualified table names or regular expressions to match the table names. You can override this in theFROM
attribute of the source-to-destination transformation. If you are planning to poll data from tens or hundreds of tables, consider using regular expressions or{tokenizing}
the Included Table(s) orFROM
.
Step 2. Create new Flow using Flow type Stream CDC events, create files
.
Add source-to-destination transformation
Step 3. Add a single source-to-destination transformation and set the following parameters:
Source Connection
: the CDC Connection created in step 1FROM
: the following options are available:*
: the system will get the list of the included tables from the CDC Connection- a regular expression to match the table names
- a comma-separated list of fully qualified table names or regular expressions matching the table names
{token}
You can select tables from the list of the available tables:
Example with a comma-separated list of included tables:
- CDC key:
[db]_[table]_cdc_stream
- Include Table(s):
test.inventory
,test.payment
,test.customer
FROM
:*
- The following files will be created in the local storage:
{app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
Example with a regular expression:
- CDC key:
[db]_[table]_cdc_stream
- Include Table(s):
^[t][e][s][t][.][aA][a-zA-Z\d_\s]+
FROM
:*
- The following files will be created in the local storage:
{app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
The {app.data}/debezium_data
is a default root location for the CDC events serialized as CSV files. You can override it by changing the value of the property Location of the CDC events
under the CDC Connection
> Storage
.
Override destination, format, and CDC Key
Step 4. Optionally select the Destination connection, Format, and enter the CDC Key in the TO.
- The
Destination Connection
overrides the storage type, the location of the CDC files, the storage credentials, and whether the files should be gzipped set in the CDC connection under the sectionStorage
.
- The
Destination Format
overrides the following parameters set in the CDC connection.
- The
TO
overrides theCDC Key
.
Schedule CDC flow
Step 5. Schedule the extract Flow. We recommend using a continuous run Schedule type. The idea is that the extract Flow runs until there are no CDC events to extract, stops for the configured number of seconds, then starts again.
Stream CDC events to a message queue
In this scenario, the CDC events (INSERTS
, UPDATES
, and DELETES
) are extracted from the transaction log of the source database and sent to the message queue, such as Kafka and Azure Event Hubs.
Message queues such as Kafka and Azure Event Hubs support transactional polling which makes the entire process very reliable and highly scalable. The only downside is the requirement to have a message queue, which is a separate element of the infrastructure that is not managed by the Etlworks Integrator.
Here is a list of fully managed message queue services, compatible with Kafka consumer and producer APIs:
- Azure Event Hubs: read how to create a Connection for Azure Event Hub.
- Confluent Platform
- CloudKafka: only service with a free forever tier. The free tier is very limited but can be used for testing.
- Aiven for Apache Kafka
- AWS managed Kafka (MSK): note that due to the limitation of the MSK, it is only possible to connect to MSK endpoints from the same VPC. Read more here.
Etlworks also supports steaming CDC events to the following destinations:
The separate load Flow polls the events from the message queue, then transforms and loads data into the destination.
There is no need to create a separate Flow for the initial load. The first time it connects to a CDC-enabled source database, it reads a consistent snapshot of all of the included databases. When that snapshot is complete, the Flow continuously reads the changes that were committed to the transaction log and generates the corresponding INSERT
, UPDATE
, and DELETE
events.
Step-by-step process
Step 1. Create a CDC Connection for the source database. Configure the following parameters:
- Include Databases(s): the comma-separated list of databases(s) to extract CDC events from.
- Include Table(s): the command-separated list of fully qualified table names or regular expressions to match the table names. You can override this in the
FROM
attribute of the source-to-destination transformation. If you are planning to poll data from tens or hundreds of tables, consider using regular expressions or{tokenizing}
the Whitelisted Table(s) orFROM
.
Step 2. Create a Kafka or Azure Event Hub Connection. The other options are:
Step 3. Create JSON (recommended) or Avro Format.
Step 4. Start creating a Flow by selecting the Stream CDC event to message queue
from the gallery.
Step 5. Add a single source-to-destination transformation where:
Source Connection
: CDC Connection created in step 1.FROM
: the following options are available:*
: the system will get the list of the included tables from the CDC Connection- a regular expression to match the table names
- a comma-separated list of fully qualified table names or regular expressions matching the table names
{token}
You can select tables from the list of the available tables:
- Destination Connection: a message queue Connection created in step 2, for example
- Kafka or Azure Event Hub.
- Format: JSON or Avro Format created in step 3.
TO
: any name, for example,topic
.
Example:
- CDC key:
[db]_[table]_cdc_stream
- Include Table(s):
test.inventory
,test.payment
,test.customer
FROM
:*
The following topics will be created and the Flow will send CDC events to these topics:
test_invemtory_cdc_stream
test_payment_cdc_stream
test_customer_cdc_stream
Step 6. Schedule the extract Flow. We recommend using a continuous run Schedule type. The idea is that the extract Flow runs until there are no CDC events to poll, stops for the configured number of seconds, then starts again.
Insert events from all tables into a single topic
If you wish to insert all events from all tables into a single topic in the message queue:
- Hardcode the
CDC Key
. CDC Key = topic name.
- Enable
Update fields for each event
.
Include attributes available in the original CDC event emitted by Debezium
A typical CDC event emitted by Debezium contains the following elements:
- before: an optional field that specifies the state of the row before the event occurred.
- after: an optional field that specifies the state of the row after the event occurred.
- source: a mandatory field that describes the source metadata for the event.
- op: a mandatory string that describes the type of operation that caused the connector to generate the event.
- ts_ms: an optional field that displays the time at which the connector processed the event,
- transaction: transaction metadata,
- schema: the schema, which describes the structure of the payload.
- fields: the JDBC schema.
By default, the connector only retains the state of the event after the update and converts it to the flat JSON. The flat JSON is then added to the message queue.
You can change the structure of the event added to the queue by enabling parts of the original event emitted by Debezium by modifying the property Structure
of the CDC event
.
This option is ignored if the connector creates files.
Generate partition key for Kafka and Events Hub
Kafka’s topics are divided into several partitions. While the topic is a logical concept in Kafka, a partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion.
The messages within a partition are ordered, messages across a topic are not guaranteed to be ordered.
The Partition Key
is used to send records to the specific Kafka or Azure Event Hub partition.
If the Partition Key
is not set the record will be sent to the random partition so we strongly recommend configuring it.
Example of the partition key: [table]
.
The same connector-specific [tokens] that can be used as a part of the CDC Key
are available for the Partition Key
.
SQL Server
[db]
: database name.[schema]
: schema name.[table]
: table name.[op]
: the CDC operation:c
(create),d
(delete),u
(update).[timestamp]
: the timestamp of the event in ms.[version]
: the connector version.[connector]
: the connector class name.[name]
: the connector name.[ts_ms]
: the timestamp of the event in ms.[snapshot]
: true for a snapshot.[change_lsn]
: change lns.[commit_lsn]
: commit lns.[event_serial_no]
: event serial number.
MySQL
[db]
: database name.[table]
: table name.[op]
: the CDC operation:c
(create),d
(delete),u
(update).[timestamp]
: the timestamp of the event in ms.[version]
: the connector version.[connector]
: the connector class name.[name]
: the connector name.[ts_ms]
: the timestamp of the event in ms.[snapshot]
: true for a snapshot.[server_id]
: the MySQL server ID.[gtid]
: the GTID.[file]
: the binlog filename.[pos]
: the position in the binlog.[row]
: the row number.
Oracle
[source]
: fully qualified event source.[op]
: the CDC operation:c
(create),d
(delete),u
(update).[timestamp]
: the timestamp of the event in ms .[version]
: the connector version.[connector]
: the connector class name.[name]
: the connector name.[ts_ms]
: the timestamp of the event in ms.[snapshot]
: true for a snapshot.[txId]
: the transaction id.[scn]
: the scn.
Postgres
[db
: database name.[schema]
: schema name.[table]
: table name.[op]
: the CDC operation:c
(create),d
(delete),u
(update).[timestamp]
: the timestamp of the event in ms.[version]
: the connector version.[connector]
: the connector class name.[name]
:the connector name.[ts_ms]
: the timestamp of the event in ms.[snapshot]
:true for a snapshot.[txId]
: the transaction ID.[lsn]
: the lsn.[xmin]
: the xmin.
DB2
[db
: database name.[schema]
: schema name.[table]
: table name.[op]
: the CDC operation:c
(create),d
(delete),u
(update).[timestamp]
: the timestamp of the event in ms.[version]
: the connector version.[connector]
: the connector class name.[name]
:the connector name.[ts_ms]
: the timestamp of the event in ms.[snapshot]
:true for a snapshot.[change_lsn].
[commit_lsn]
.
MongoDB
[db]
: the database name.[collection]
: the collection name.[op]
: the CDC operation:c
(create),d
(delete),u
(update).[timestamp]
: the timestamp of the event in ms.[version]
: the connector version.[connector]
: the connector class name.[name]
: the connector name.[ts_ms]
: the timestamp of the event in ms.[snapshot]
: true for a snapshot.[order]
: the order of the event.[h]
: the h of the event.
Database specific use cases
By default, our CDC connectors are configured to support the most common scenarios. Below are some database-specific tips for less common use cases.
MySQL
Legacy and Modern implementations
MySQL CDC connector includes two implementations:
Modern
: Modern implementation is based on the common connector framework used by all CDC connectors.Legacy
: Legacy implementation is deprecated but can be used for backward compatibility with the CDC Flows created before we introduced the Modern implementation.
MySQL Snapshot modes
Snapshot mode specifies the criteria for running a snapshot upon startup of the connector. Read about MySQL snapshot modes.
Add new tables to the pipeline (MySQL)
Make sure the snapshot mode is set to one of the following:
ad-hoc initial
.ad-hoc schema only
.always_recover_schema
.schema_only_always_recover_schema
.
If you want to trigger the incremental snapshot of the new table( s) make sure Automatically trigger ad-hoc snapshot for new tables
is enabled.
Add new tables to the pipeline:
- Stop the Flow.
- Add new tables to the
Included Tables
field in the CDC connection or to theFROM
field in the CDC flow. IMPORTANT: the table names must be specified in a fully-qualified format. Adding new tables using regular expressions is not supported. - Restart the Flow.
Work with spatial (geometry) data types (MySQL)
By default our MySQL CDC connector encodes spatial (geometry) data types using WKB format and then uses Base64 encoding on top of that. If you want the connector to convert the geometry data to human-readable JSON enable Convert Geometry WKB
to JSON
.
Stream CDC events from the specific binlog file, position, and optionally GTID
To configure the MySQL CDC connector to start streaming from the specific binlog file and position.
It is only available when the connector is switching from the snapshot to streaming mode, and the snapshot mode is set to ad-hoc initial
, ad-hoc schema only
,initial
,schema_only
, always_recover_schema
or schema_only_always_recover_schema
.
Enable cursor-based result set when performing an initial snapshot
By default, the MySQL CDC connector reads data from the large dataset using the recommended technique by creating the JDBC statement with the flags ResultSet#TYPE_FORWARD_ONLY and ResultSet#CONCUR_READ_ONLY, and by setting the fetch size hint to Integer#MIN_VALUE.
In some rare cases, it could cause the following error: "No statements may be issued when any streaming result sets are open and in use on a given connection".
database.useCursorFetch
=true
to Other Parameters
:
Postgres
Postgres Snapshot modes
Snapshot mode specifies the criteria for running a snapshot upon startup of the connector. Read about Postgres snapshot modes.
Add new tables to the pipeline (Postgres)
If you want to trigger the incremental snapshot of the new table( s) make sure Automatically trigger ad-hoc snapshot for new tables
is enabled.
Add new tables to the pipeline:
- Stop the Flow.
- Add new tables to the
Included Tables
field in the CDC connection or to theFROM
field in the CDC flow. IMPORTANT: the table names must be specified in a fully-qualified format. Adding new tables using regular expressions is not supported. - Restart the Flow.
Stream large transactions using wal2json plugin
If you are using a wal2json plug-in and transactions are very large, the JSON batch event that contains all transaction changes might not fit into the hard-coded memory buffer, which has a size of 1 GB. In such cases, switch to a streaming plug-in, by setting the plugin to wal2json_streaming
or wal2json_rds_streaming
.
Avoid abnormal consumption of WAL database disk space
In certain cases, it is possible that PostgreSQL disk space consumed by WAL files either experiences spikes or increases out of usual proportions.
This article suggests a solution for the problem.
Here is an example of the configuration which a) makes a connector to emit an empty heartbeat event every minute, and b) makes a connector to UPSERT a record into the heartbeat table on each heartbeat event.
Propagate default values
If option Propagate default values
is enabled and a default value is specified for a column in the database schema, the PostgreSQL connector will attempt to propagate this value to the stored schema whenever possible.
This option is disabled by default.
SQL Server
SQL Server Snapshot modes
Snapshot mode specifies the criteria for running a snapshot upon startup of the connector. Read about SQL Server snapshot modes.
Add new tables to the pipeline (SQL Server)
If you want to trigger the incremental snapshot of the new table( s) make sure Automatically trigger ad-hoc snapshot for new tables
is enabled.
Add new tables to the pipeline:
- Stop the Flow.
- Add new tables to the
Included Tables
field in the CDC connection or to theFROM
field in the CDC flow. IMPORTANT: the table names must be specified in a fully-qualified format. Adding new tables using regular expressions is not supported. - Restart the Flow.
Work with binary and spatial (geometry) data types (SQL Server)
It is recommended to serialize columns with binary and spatial (geometry) data as hex-encoded strings by setting property binary.handling.mode
to hex
.
Oracle
Use JDBC URL to configure CDC connection
Due to the variety of URL formats supported by the Oracle JDBC driver, it is recommended to use the fully qualified Oracle JDBC URL to configure the CDC connection. You still need to enter the Host
the Port
and the Database
, also (optionally) the PDB name
.
Oracle Snapshot modes
Snapshot mode specifies the criteria for running a snapshot upon startup of the connector. Read about Oracle snapshot modes.
Add new tables to the pipeline (Oracle)
Make sure the snapshot mode is set to one of the following:
ad-hoc initial
.ad-hoc schema only
.always_recover_schema
.schema_only_always_recover_schema
.
If you want to trigger the incremental snapshot of the new table( s) make sure Automatically trigger ad-hoc snapshot for new tables
is enabled.
Add new tables to the pipeline:
- Stop the Flow.
- Add new tables to the
Included Tables
field in the CDC connection or to theFROM
field in the CDC flow. IMPORTANT: the table names must be specified in a fully-qualified format. Adding new tables using regular expressions is not supported. - Restart the Flow.
DB2
DB2 Server Snapshot modes
Snapshot mode specifies the criteria for running a snapshot upon startup of the connector. Read about DB2 snapshot modes.
Add new tables to the pipeline (DB2)
If you want to trigger the incremental snapshot of the new table( s) make sure Automatically trigger ad-hoc snapshot for new tables
is enabled.
Add new tables to the pipeline:
- Stop the Flow.
- Add new tables to the
Included Tables
field in the CDC connection or to theFROM
field in the CDC flow. IMPORTANT: the table names must be specified in a fully-qualified format. Adding new tables using regular expressions is not supported. - Restart the Flow.
MongoDB
Use Cluster URL to browse data
Technically speaking the Cluster URL
is not used by the CDC connector. It is, however, required if you want to browse the data in MongoDB using the Etlworks Explorer.
Cluster URL
Browsing data in Explorer
Connect to sharded MongoDB cluster
By default, our MongoDB CDC connector always attempts to connect to the primary node in the sharded cluster. If the node is down the connector will try to reconnect to the new elected primary node.
There are two parameters that control this behavior:
Ensure that there is an active session on the replica set
: if this option is enabled, the connector will ensure that there is an active session on the replica set mongo client before determining the primary node. If this option is disabled, the connector will just connect to the first node in the cluster returned by the topology discovering service. It is not guaranteed that this node will be primary. This option is enabled by default.Use secondary node if primary is not found
: if this option is enabled (and option above is also enabled) and the connector is unable to determine the primary node, it will attempt to connect to the secondary node. This option is enabled by default.
Add new collections to the pipeline
To add new collections to the pipeline:
- Stop the Flow.
- Add new collections to the
Included Collections
list. - Restart the Flow.
The Flow will start steaming changes for new collections right away.
The MongoDB connector does not currently support snapshotting of the new collections so if you need it you will have to create a new CDC Connection specifically for these collections.
Enable support for MongoDB Change Streams
Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Read more about MongoDB Change Streams.
Etlworks MongoDB CDC connector supports 3 capture modes:
oplog
: enable to capture changes from oplog. This is the default method.change_streams
: enable to capture via MongoDB Change Streams mechanism, update message does not contain the full message.change_streams_update_full
: enable to capture via MongoDB Change Streams mechanism, update message contains the full message.
Ad-hoc and incremental snapshots
Ad-hoc snapshots
When the initial snapshot is completed it is not executed again as further data are acquired via streaming.
Sometimes it would be really useful to re-execute a snapshot, either completely, or for only a subset of tables. Examples for such use cases include:
-
a new table was added to the list of captured tables.
-
some of the topics in the message queue were deleted and their content needs to be rebuilt.
- the existing table needs to be re-snapshotted for whatever reason.
Incremental snapshot
Initial snapshots are a great way to obtain a consistent set of all data stored in a database upfront. But there are also some downsides to this approach, though:
-
For larger datasets it can take very long to complete; hours or even days.
-
The snapshot must be completed before the start of streaming.
-
Snapshots are not resumable If a snapshot hasn’t been completed when the connector gets stopped or restarted, the snapshot must be restarted from scratch.
-
New tables that are added among the captured ones later are not snapshotted.
To mitigate these issues, a new mechanism called incremental snapshotting has been introduced. The table is snapshotted in chunks, and streaming and snapshot events are blended together.
The table must have a primary key in order for the connector to be able to execute an incremental snapshot for that table.
Trigger incremental snapshot of the new tables
The incremental snapshot of the new tables requires restarting the CDC extract Flow
Make sure Automatically trigger ad-hoc snapshot for new tables
is enabled.
Make sure the snapshot mode is set to one of the following:
ad-hoc initial
.ad-hoc
.ad-hoc schema only
.always_recover_schema
.schema_only_always_recover_schema
.
Add new tables to the pipeline:
- Stop the Flow.
- Add new tables to the
Included Tables
field in the CDC connection or to theFROM
field in the CDC flow. IMPORTANT: the table names must be specified in a fully-qualified format. Adding new tables using regular expressions is not supported. - Restart the Flow.
Trigger ad-hoc snapshot of the existing tables
The ad-hoc snapshot of the existing tables requires restarting the CDC extract Flow.
How ad-hoc snapshot works
It is assumed that there is a configurable data collection that contains the list of database tables to snapshot. We call it a signal
. The signal
can be one of the following:
- A database table. The database doesn't have to be CDC-enabled but the Flow should be able to delete records from a signal table.
- A file in any of the supported file storage systems. The flow must be able to delete the file.
The CDC flow monitors the signal
. When changes are detected the Flow reads the list of tables in the signal
and triggers the ad-hoc for the tables listed in the data collection.
The flow immediately deletes the processed signal
:
- If the
signal
is a database table the Flow deletes all records from that table. - If the
signal
is a file the Flow deletes the file.
The format of the signal
The signal
table or file must contain a single column that can have any name.
The tables to snapshot can be stored:
a) Horizontally - multiple comma-separated tables in a single row, for example test.inventory
, test.payment
, test.customer
or/and
b) Vertically - multiple rows with table names, each row can contain one or multiple comma-separated tables.
Configure ad-hoc snapshot
In Connections
Step 1. Create file storage or database connection. The connection must be read/write.
Step 2. For the file storage connections create a format.
In CDC Flow.
Step 3. Add connection and format created in steps 1-2 under the Connection tab of the CDC Flow.
In CDC connection.
Step 4. Make sure the snapshot mode is set to one of the following:
ad-hoc initial
.ad-hoc
.ad-hoc schema only
.
Step 5. Configure the name of the signal
data collection.
If the signal
is a database table enter a fully qualified table name.
If the signal
is a file enter the file name with extension but without the path.
Trigger add-hoc snapshot
Step 1. Stop CDC extract flow.
Step 2. To trigger the ad-hoc snapshot for the tables listed in the signal
data collection insert records into the signal table or create a signal file.
Step 3. Start CDC extract flow.
Create files in a cloud storage
By default, the extract Flow creates files with CDC events in the local (server) storage. It is possible to configure the CDC Connection to create files directly in any of the following cloud storage services:
- Amazon S3
- Azure Storage
- Google Cloud Storage
Use-cases for creating CDC files in a cloud storage
When files are created in cloud storage, it opens the possibility of using serverless data pipelines for loading data into the databases and data warehouses. For example, read about loading data continuously using Snowflake Snopipes.
The other advantage of storing files directly in a cloud is eliminating the step that requires copying files to the cloud in the first place. For example, Amazon Redshift (or any RDS database) can bulk-load files from the S3. When files are already in S3 there is no need to create an additional Flow to move files from the local storage to S3. The same is true for Azure databases and data warehouses, for example, Azure Synapse Analytics can bulk load files from the Azure storage so creating the files in the Azure storage makes the pipeline much faster by eliminating a step that moves the files from the local storage to the Azure Blob.
Flows optimized for bulk loading files created by CDC flow
- Bulk load files into Snowflake.
- Load files in S3 into Redshift.
- Bulk load files in Azure Storage into Synapse Analytics.
- Bulk load files into the database.
How to configure CDC connection or CDC flow to create files in a cloud storage
Read how to override the storage and the location of the CDC files.
Read how to configure the storage, other than local (server), when creating a CDC Connection.
Tips and tricks
Handle SQL NULL and empty ('') values when loading files into databases
If SQL NULL is not the same as empty ('')
Step 1. When creating a CDC Connection, keep Convert null to empty
disabled. Note that it is disabled by default.
Step 2. When creating a CVS Format for FROM
and TO
:
- Enable
Convert 'null' to null
. - Disable
Convert empty string to null
. - Set
Value for null
to'null'
.
Step 3. When creating a Flow to load files into Snowflake, set String used to convert to and from SQL NULL
to ('NULL','null')
.
If SQL NULL is the same as empty ('')
Step 1. When creating a CDC Connection, enable Convert null to empty
. Note that it is disabled by default.
Step 2. When creating a CVS Format for FROM
and TO
:
- Disable
Convert 'null' to null
. - Enable
Convert empty string to null
. - Set
Value for null
empty.
Step 3. When creating a Flow to load files into Snowflake, set String used to covert to and from SQL NULL
to ('NULL','null','')
.
Handle source table DDL changes
If you are using a Flow that stores CDC events as CSV files and are expecting that the structure of the source table can be modified when the Flow is still streaming data from this table when creating the CDC Connection, enable Close CSV file if header has changed
.
Preserve the exact data types of the columns in the source table when creating a destination table
The Etlworks Integrator can automatically create a destination table when loading CSV files. By default, the Flow will sample the file and set the data types of the columns in the destination table. This mechanism is not 100% accurate so the destination table might end up with columns that have different data types compare to the source table.
To prevent this from happening:
Step 1. When creating the CDC Connection, enable Save Metadata
.
Step 2. When creating a CVS Format for FROM
and TO
.
- Enable
Save Metadata
. - Keep
All fields are strings
disabled.
In addition, when enabled, this feature allows the CDC load Flow to automatically set the Lookup Fields
required for the CDC MERGE
by extracting the information about the primary keys from the source database. It significantly enhances the accuracy of the Predict Lookup Fields
feature. Basically makes it bulletproof and very low cost.
Create all columns as strings (VARCHAR)
The Etlworks Integrator can automatically create a destination table when loading CSV files. If you wish to create all columns as strings (VARCHAR
), then:
Step 1. When creating the CDC Connection disable Save Metadata
.
Step 2. When creating a CVS Format for FROM
and TO
:
- Disable
Save Metadata
. - Enable
All fields are strings
.
Implement soft deletes
By default, Load Flow creates, updates, and deletes records in the destination table based on the type of the CDC event: c
for create, u
for update, and d
for delete.
If you need to implement the soft delete - add the Extra columns
with the specific function to the CDC Connection. The following functions are available:
cdc_boolean_soft_delete
: the boolean column with a user-defined name will be added to the CDC stream. Values returned by the function: true for delete, false for all other events.cdc_boolean_soft_delete(yyyy-MM-dd HH:mm:ss.SSS
: the timestamp column with a user-defined name will be added to the CDC stream. Values returned by the function:timestamp
for delete,null
for all other events.
In this example, the column delete_at
will be added to the CDC stream and its value will be set to the current timestamp with a milliseconds precision.
If any of the functions above are used the CDC event type is automatically switched to u
(update).
Add columns to the CDC stream
By default, the CDC stream contains all columns in the source table. You can add columns with user-defined names to the stream using the Extra columns
property of the CDC Connection.
Format of the property: column_name[=function[(parameter)],column_name[=function[(parameter)]
Available functions:
cdc_op
: the CDC event type:c
for create,u
for update, andd
for deletecdc_key
: the CDC key in the user-defined Formatcdc_timestamp(yyyy-MM-dd HH:mm:ss.SSS)
: the timestamp of the CDC event. It is not guaranteed that this value will be unique for each event in the sequence of events.cdc_event_seq
: the unique 'number' of the event. This number grows sequentially.cdc_boolean_soft_delete
: the boolean column:true
for delete,false
for all other eventscdc_timestamp_soft_delete(yyyy-MM-dd HH:mm:ss.SSS)
: the timestamp column:timestamp
for delete,null
for all other events.
Change CDC key and CDC event
The CDC event represents a row (a record) in the CDC stream. The CDC key represents the unique identifier of the group of events. All events with the same CDC key are stored in the same file.
Use Preprocessor
to change the CDC event and/or CDC key.
Available scripting languages
- JavaScript
Available variables
- event: the CDC event serialized as JsonNode.
- key: CDC key.
- source: CDC source serialized as JsonNode.
- transactionId: the transaction id associated with the event if transaction markers are enabled.
Get and set the values of the column in the event
To get the value of the column, serialized as a string:
var val = event.get('columns_name').asText();
To set the value:
event.put('columns_name', string);
Possible return values
true
/false
: if false the event will be skipped.TypedKeyValue<Boolean, String>
: the key is an equivalent of true/false above, the value is CDC key to replace the automatically calculated CDC key.
Example:
var newCdcKey= 'abc';
value = new TypedKeyValue(true, newCdcKey);
Create CSV and JSON files using the specified character encoding
If the source database has tables with data encoded using any character set other than ASCII (for example UTF-8
), you can configure the connector to create files using that specific encoding.
To preserve the encoding when loading files into the databases, you will need to set the same character encoding for the CSV Format.
If the encoding is not specified or set to no encoding
, the JSON files are created with UTF-8
encoding.
Transaction markers
Consider, for instance, an e-commerce application that manages purchase orders. In a relational database, the information representing such order will typically be spread across multiple tables, e.g. PURCHASE_ORDER
, ORDER_LINE
, and SHIPMENT_ADDRESS
. To examine the complete state of a purchase order in the database itself, you’d run a JOIN
query against the three tables.
Things get more challenging when looking at the change events produced by CDC connectors.
The situation can be addressed by leveraging the new transaction metadata supported by most CDC connectors. When enabled, separate transaction markers will be created at the beginning and end of each transaction.
To enable transaction markers, modify the CDC Connection by enabling the Provide Transaction Metadata
parameter. Optionally set the maximum wait time (in minutes) before forcing the creation of the commit transaction
marker.
The transaction markers are not available for the MongoDB CDC connector.
How to stop CDC Flow
You can configure the CDC Connection to automatically stop streaming the CDC events. The CDC Flow will stop if any of the above reaches the configured threshold.
Number of retries before giving up
: the number of retries before giving up if poll returns no records. The default is 100.Retry N minutes before giving up
: the number of minutes to retry before giving up if the poll returns no records.Always stop after N minutes
: the number of minutes to run the connector before stopping. The connector will stop even if there are new records in the queue. Note that the connector will not stop during the snapshot.
If none of the parameters are set, the CDC stream will run forever until manually stopped.
To stop CDC Flow manually, click Stop
/ Cancel
.
How to reset CDC Flow
Typical CDC extract flow starts by snapshotting the monitored tables (A) or starts from the oldest known position in the transactional (redo) log (B), then proceeds to stream changes in the source database (C). If the Flow is stopped and restarted it resumes from the last recorded position in the transactional log.
Occasionally it is required to "reset" the flow to restart the process from scratch (A) or (B).
Process
Step 1. Create a new Server storage connection.
Step 2. Set File(s) to /debezium_data/*.*
.
Step 3. Stop CDC flow if it is running.
Step 4. Open the connection created in step 1 in Explorer and delete .dat
file(s) associated with the CDC connection.
Step 5. Start CDC flow. Depending on the configuration the Flow will either perform the full snapshot or will start from the oldest position in the transactional log.
The Load
This Flow loads data files created by the extract Flow into any supported destination.
Just like with extract Flow, there are two load Flows types:
- The Flows that load data from files.
- The Flows that load data in a message queue.
Load data from files
Bulk load CSV files created by CDC flow into Snowflake
The most efficient way of loading files created by CDC flow into the Snowflake is to use the flow type Bulk load files into Snowflake.
This flow type does not transform the data.
Transform and Load CSV file created by CDC flow into Snowflake
This Flow reads CDC events from the CSV files in local or cloud storage, then transforms and loads data into the Snowflake.
The staged CSV files created by the extract Flow are processed from there, then deleted after the processing.
Step 1. Create a Server storage Connection for the staged CSV files. The default location is {app.data}/debezium_data/events
.
Step 2. Create CSV Format with all default settings. Enable Save metadata
parameter.
Step 3. Create all required for Snowflake Flows Connections and Formats.
Step 4. Create a new Flow using Flow type File to Snowflake
.
Follow the same steps as suggested in this article.
Step 5. Add as many source-to-destination transformations as there are distinct filename patterns to poll data from.
For example, if the following files are created by the extract Flow:
{app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
You will need 3 pairs of source-to-destination transformations, where the sources (FROM
) will need to be set to the following wildcard file names:
test_inventory_cdc_stream_*.csv
test_payment_cdc_stream_*.csv
test_customer_cdc_stream_*.csv
Alternatively, if you want to process all files without creating separate pairs of source-to-destination transformations:
- Set the
FROM
to the wildcard file name that matches all files to load, for example,*_*_cdc_stream_*.csv
. - Set the
TO
toschema.*
, for example,public.*
. - Set
Calculate Destination Object Name
underMAPPING
>Parameters
to:
var start = name.indexOf('-');
var end = name.indexOf('_cdc_stream');
value = name.substring(start + 1, end);
You can also use regular expressions, for example, _(.*?)_cdc_stream
to match the table names.
Read more about configuring TO
when processing files by a wildcard name.
For each transformation set the following parameters:
Source Connection
: the Connection created in step 1.- CSV Format created in step 2.
FROM
: a wildcard filename as described above.Destination Connection
: a Connection for the Snowflake stage (internal or external).Destination Format
: CSV Format created in step 2.TO
: a destination table name or a wildcard table name.
Step 6. If you are not planning to transform the data before loading it into the Snowflake, enable Ignore Transformations
under Parameters
. This will have a positive impact on performance.
Step 7. Optionally, configure the number of files to process in one batch. It is useful if you expect that a large (many thousands) number of files could be created by the extract Flow in a short period of time and don't want the load Flow to be overwhelmed. Processing a smaller number of files (hundreds) in micro-batches will be more efficient.
Step 8. Enable Delete loaded source files
under Parameters
.
Step 9. All other parameters are the same as in the regular Snowflake Flow, except the Action
, which can be set to one of the following:
CDC MERGE
: the system will use the event type stored in the record:c
forINSERT
,u
forUPDATE
andd
forDELETE
. This option requires setting theLookup Fields
to the comma-separated list of fields that uniquely identify the record in the target database. Optionally, you can enable thePredict Lookup Fields
option. If neither works you can specify the list of table=fields pairs in theLookup Fields
. Use the fully-qualified table names and;
as a separator betweentable=field
pairs.COPY INTO
: in some cases, it makes sense to always insert unmodified CDC events into the staging table. Then load events from the staging table(s) into the final table using SQL. When this option is enabled, the following fields will be automatically added to the staging table:debezium_cdc_op
: event typec
for create,u
for update, andd
for delete.debezium_cdc_timestamp
: the unique sequence number of the event based on timestamp.
Step 10. Schedule load Flow to run in parallel with the extract Flow. Just like with extract Flow we recommend using a continuous run Schedule type.
Bulk load CSV files created by CDC flow in cloud storage into Redshift
If you don't want to transform the files in the S3 the most efficient way of loading these files into the Redshift is to use flow type Bulk load files in S3 into Redshift.
This works best when the CDC flow is configured to create the files in the cloud storage.
Transform and Load CSV files created by CDC flow into Amazon Redshift
This Flow reads CDC events from the CSV files in local or cloud storage, then transforms and loads data into the Amazon Redshift.
The staged CSV files created by the extract Flow are processed from there, then deleted after the processing.
Step 1. Create a Server storage Connection for the staged CSV files. The default location is {app.data}/debezium_data/events
.
Step 2. Create a CSV Format. Enable Save metadata
parameter.
Step 3. Create all required for Redshift Flows Connections and Formats.
Step 4. Create a new Flow using Flow type File to Redshift
.
Follow the same steps as suggested in this article.
Step 5. Add as many source-to-destination transformations as there are distinct filename patterns to poll data from.
For example, if the following files are created by the extract Flow:
{app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
You will need 3 pairs of source-to-destination transformations, where the sources (FROM
) will need to set to the following wildcard file names:
test_inventory_cdc_stream_*.csv
test_payment_cdc_stream_*.csv
test_customer_cdc_stream_*.csv
Alternatively, if you want to process all files without creating separate pairs of source-to-destination transformations:
- Set the
FROM
to the wildcard file name that matches all files to load, for example,*_*_cdc_stream_*.csv
. - Set the
TO
toschema.*
, for examplepublic.*
. - Set
Calculate Destination Object Name
underMAPPING
>Parameters
to:
var start = name.indexOf('-');
var end = name.indexOf('_cdc_stream');
value = name.substring(start + 1, end);
You can also use regular expressions, for example, _(.*?)_cdc_stream
to match the table names.
Read more about configuring TO
when processing files by a wildcard name.
For each transformation set the following parameters:
Source Connection
: Connection created in step 1.- CSV Format created in step 2.
FROM
: a wildcard filename as described above.Destination Connection
: a Connection for the Redshift stage.Destination Format
: CSV Format created in step 2.TO
: a destination table name or a wildcard table name.
Step 6. If you are not planning to transform the data before loading it into the Redshift, enable Ignore Transformations
under Parameters
. This will have a positive impact on performance.
Step 7. Optionally, configure the number of files to process in one batch. It is useful if you expect that a large (many thousands) number of files could be created by the extract Flow in a short period of time and don't want the load Flow to be overwhelmed. Processing a smaller number of files (hundreds) in micro-batches will be more efficient.
Step 8. Enable Delete loaded source files
under Parameters
.
Step 9. All other parameters are the same as in the regular Redshift Flow, except the Action
, which can be set to one of the following:
CDC MERGE
: the system will use the event type stored in the record:c
forINSERT
,u
forUPDATE
andd
forDELETE
. This option requires setting theLookup Fields
to the comma-separated list of fields that uniquely identify the record in the target database. Optionally, you can enable thePredict Lookup Fields
option. If neither works you can specify the list oftable=fields
pairs in theLookup Fields
. Use the fully-qualified table names and;
as a separator betweentable=fields
pairs.COPY
: in some cases, it makes sense to always insert unmodified CDC events into the staging table. Then load events from the staging table(s) into the final table using SQL. When this option is enabled, the following fields will be automatically added to the staging table:debezium_cdc_op
:c
for create,u
for update, andd
for delete.debezium_cdc_timestamp
: the unique sequence number of the event based on timestamp.
Step 10. Schedule load Flow to run in parallel with the extract Flow. Just like with extract Flow, we recommend using a continuous run Schedule type.
Bulk load CSV files created by CDC flow in Azure Storage into Synapse Analytics
The most efficient way of loading files created by CDC flow into the Synapse Analytics is to use the flow type Bulk load files in Azure Storage into Synapse Analytics.
Transform and Load CSV files created by CDC flow into Azure Synapse Analytics
This Flow reads CDC events from the CSV files in local or cloud storage, then transforms and loads data into the Azure Synapse Analytics.
The staged CSV files created by the extract Flow are processed from there, then deleted after the processing.
Step 1. Create a Server storage Connection for the staged CSV files. The default location is {app.data}/debezium_data/events
.
Step 2. Create CSV Format with all default settings. Enable Save Metadata
parameter.
Step 3. Create all required for Synapse Analytics Flows Connections.
Step 4. Create a new Flow using Flow type File to Azure Synapse Analytics
.
Follow the same steps as suggested in this article.
Step 5. Add as many source-to-destination transformations as there are distinct filename patterns to poll data from.
For example, if the following files are created by the extract Flow:
{app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
You will need 3 pairs of source-to-destination transformations, where the sources (FROM
) will need to be set to the following wildcard file names:
test_inventory_cdc_stream_*.csv
test_payment_cdc_stream_*.csv
test_customer_cdc_stream_*.csv
Alternatively, if you want to process all files without creating separate pairs of source-to-destination transformations:
- Set the
FROM
to the wildcard file name that matches all files to load, for example*_*_cdc_stream_*.csv
. - Set the
TO
toschema.*
, for examplepublic.*
. - Set
Calculate Destination Object Name
underMAPPING
>Parameters
to:
var start = name.indexOf('-');
var end = name.indexOf('_cdc_stream');
value = name.substring(start + 1, end);
You can also use regular expressions, for example, _(.*?)_cdc_stream
to match the table names.
Read more about configuring TO
when processing files by a wildcard name.
For each transformation set the following parameters:
Source Connection
: the Connection created in step 1.- CSV Format created in step 2.
FROM
: a wildcard filename as described above.Destination Connection
: a Connection for the Snowflake stage (internal or external).Destination Format
: CSV Format created in step 2.TO
: a destination table name or a wildcard table name.
Step 6. If you are not planning to transform the data before loading into the Synapse Analytics, enable Ignore Transformations
under Parameters
. This will have a positive impact on performance.
Step 7. Optionally, configure the number of files to process in one batch. It is useful if you expect that a large (many thousands) number of files could be created by the extract Flow in a short period of time and don't want the load Flow to be overwhelmed. Processing a smaller number of files (hundreds) in micro-batches will be more efficient.
Step 8. Enable Delete loaded source files
under Parameters
.
Step 9. All other parameters are the same as in the regular Synapse Analytics Flow, except the Action
, which can be set to one of the following:
CDC MERGE
: the system will use the event type stored in the record:c
forINSERT
,u
forUPDATE
andd
forDELETE
. This option requires setting theLookup Fields
to the comma-separated list of fields that uniquely identify the record in the target database. Optionally, you can enable thePredict Lookup Fields
option. If neither works you can specify the list oftable=field
pairs in theLookup Field
. Use the fully-qualified table names and;
as a separator betweentable=field
pairs.COPY INTO
: in some cases, it makes sense to always insert unmodified CDC events into the staging table. Then load events from the staging table(s) into the final table using SQL. When this option is enabled, the following fields will be automatically added to the staging table:debezium_cdc_op
: event type,c
for create,u
for update, andd
for delete.debezium_cdc_timestamp
: the unique sequence of the event based on timestamp.
Step 10. Schedule load Flow to run in parallel with the extract Flow. Just like with extract Flow we recommend using a continuous run Schedule type.
Transform and Load CSV files created by CDC flow into the relational database
If you don't want to transform the files in the cloud storage and the destination database supports a bulk load using SQL the most efficient way of loading these files into the destination database is to use flow type Bulk load files into database.
This works best when the CDC flow is configured to create the files in the cloud storage.
The alternative is to use a flow type "file to database". This Flow reads CDC events from the CSV files in local or cloud storage then transforms and loads data into the destination database. Unlike the bulk load flow, this flow type supports transformations.
The staged CSV files created by the extract Flow are processed then deleted after the processing.
Step 1. Create a Server storage Connection for the staged CSV files. The default location is {app.data}/debezium_data/events
.
Step 2. Create a CSV Format. Enable Save Metadata
parameter.
Step 3. Create a Connection to the destination database.
Step 4. Create a new Flow using a Flow type File to database
(assuming that the destination is a database).
Step 5. Add as many source-to-destination transformations as there are distinct filename patterns to poll data from.
For example, if the following files are created by the extract Flow:
{app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
You will need 3 pairs of source-to-destination transformations, where the sources (FROM
) will need to set to the following wildcard file names:
test_inventory_cdc_stream_*.csv
test_payment_cdc_stream_*.csv
test_customer_cdc_stream_*.csv
Alternatively, if you want to process all files without creating separate pairs of source-to-destination transformations:
- Set the
FROM
to the wildcard file name that matches all files to load, for example*_*_cdc_stream_*.csv
. - Set the
TO
toschema.*
, for example,public.*
. - Set
Calculate Destination Object Name
underMAPPING
>Parameters
to:
var start = name.indexOf('-');
var end = name.indexOf('_cdc_stream');
value = name.substring(start + 1, end);
You can also use regular expressions, for example, _(.*?)_cdc_stream
to match the table names.
Read more about configuring TO
when processing files by a wildcard name.
Step 6. For each transformation set the following parameters:
Source Connection
: the Connection created in step 1.Source Format
: the CSV Format created in step 2.FROM
: a wildcard filename as described above.Destination Connection
: a destination database Connection created in step 3.TO
: a destination table name or a wildcard table name as described above.- Enable
Delete loaded sources files
. - Set the
Action
underMAPPING
/Parameters
: action defines a SQL command to generate. SetAction
to one of the following:Record
: the system will automatically generateINSERT
,UPDATE
orDELETE
SQL statement based on the value of thedebezium_cdc_op
(c
,u
,d
) populate during extract from the CDC-enabled databaseRecord with MERGE
(recommended): same asRecord
, except the system will generateMERGE
SQL statement for inserts and updates. Note that not all databases support nativeMERGE
, also some databases, for example, MySQL and PostgreSQL, require a unique index for the fields that are used to uniquely identify the record.Record with IfExist
: same as Record withMERGE
, except it uses aSELECT
statement to check if record exists and then conditionally generates eitherINSERT
orUPDATE
.
- Set the
Lookup Field
underMAPPING
/Parameters
: a comma-separated list of fields that uniquely identify the record. Alternatively, you can enable thePredict Lookup Fields
which if enabled will force the Flow to use various algorithms to automatically predict the fields that uniquely identify the record. Note that it is not always possible to correctly detect the unique fields. If neither works, you can specify the list oftable=fields
pairs in theLookup Field
. Use the fully-qualified table names and;
as a separator betweentable=field
pairs.
If the SQL action is set to CDC merge
, it is possible to configure the load Flow to skip DELETE
events by enabling the property Do not execute DELETE when Action is Record with MERGE
.
It greatly improves the performance of the MERGE
because it automatically enables batch processing, otherwise disabled for the CDC merge
action.
Step 7. Optionally, configure the number of files to process in one batch. It is useful if you expect that a large (many thousands) number of files could be created by the extract Flow in a short period of time and don't want the load Flow to be overwhelmed. Processing a smaller number of files (hundreds) in micro-batches will be more efficient.
Step 8. Schedule load Flow to run in parallel with the extract Flow. Just like with extract Flow we recommend using a continuous run Schedule type.
Load data from a message queue
Transform and Load data from a message queue into Snowflake
This Flow reads CDC events from the message queue's topics, then transforms and loads data into the Snowflake.
Step 1. Create a Kafka or Azure Event Hub Connection. You can reuse the same Connection that you created for the extract Flow.
You must enter a unique name in the Group ID
field.
Also, consider changing the default values for the following parameters:
Number of retries before stop polling
: the number of retries before stop polling if poll returns no records. By increasing this number, you can give the Flow more time to read the events from the queue.Max number of records to poll
: the maximum number of records to poll in one call. By changing this number, you can tune the Flow for maximum performance or minimum RAM consumption.Max number of records to read
: the total maximum number of records to read from the queue. Set it to a reasonable number to allow the system to process records in micro-batches. If nothing is entered, the system will read records from the queue until there are no more records. By changing this number, you are either increasing or decreasing the size of the micro-batches.
Step 2. Create all required Connections and Formats.
Step 3. Start creating a Flow by typing in Queue to Snowflake
.
Step 4. Select Queue to Snowflake
.
Follow the same steps as suggested in this article.
Step 5. Add as many source-to-destination transformations as there are queue topics to poll data from. Most likely you are going to have one topic per source table.
For each transformation set the following parameters:
Source Connection
: queue Connection created in step 1.Source Format
: a Format used in the extract Flow, most likely JSON.FROM
: a topic name to poll data from.Destination Connection
: a Connection for the Snowflake stage.TO
: a destination table name.
Step 6. All parameters are the same as in the regular Snowflake Flow, except the Action
, which can be set to one of the following:
CDC MERGE
: the system will use the event type stored in the record:c
for insert,u
for update, andd
for delete. This option requires setting theLookup fields
to the comma-separated list of fields that uniquely identify the record in the target database. Optionally, you can enable thePredict Lookup Fields
option. If neither works, you can specify the list oftable=fields
pairs in theLookup Field
Use the fully-qualified table names and;
as a separator betweentable=field
pairs.COPY INTO
: in some cases, it makes sense to always insert unmodified CDC events into the staging table. Then load events from the staging table(s) into the final table using SQL. When this option is enabled, the following fields will be automatically added to the staging table:debezium_cdc_op
: event type,c
for create,u
for update, andd
for delete.debezium_cdc_timestamp
: the unique sequence of the event based on timestamp.
Step 7. Schedule load Flow to run in parallel with the extract Flow. Just like with extract Flow we recommend using a continuous run schedule type.
Transform and Load data from a message queue into Amazon Redshift
This Flow reads CDC events from the message queue's topics, then transforms and loads data into the Amazon Redshift.
Step 1. Create a Kafka or Azure Event Hub Connection. You can reuse the same Connection that you created for the extract Flow.
You must enter a unique name in the Group ID
field.
Also, consider changing the default values for the following parameters:
Number of retries before stop polling
: the number of retries before stop polling if poll returns no records. By increasing this number, you can give Flow more time to read the events from the queue.Max number of records to poll
: the maximum number of records to poll in one call. By changing this number, you can tune the Flow for maximum performance or minimum RAM consumption.Max number of records to read
: the total maximum number of records to read from the queue. Set it to a reasonable number to allow the system to process records in micro-batches. If nothing is entered, the system will read records from the queue until there are no more records. By changing this number you are either increasing or decreasing the size of the micro-batches.
Step 2. Create all required Connections and Formats.
Step 3. Start creating a Flow by typing in queue to redshift
.
Step 4. Select Queue to Redshift
.
Follow the same steps as suggested in this article.
Step 5. Add as many source-to-destination transformations as there are queue topics to poll data from. Most likely you are going to have one topic per source table.
For each transformation set the following parameters:
Source Connection
: queue Connection created in step 1.Source Format
: a Format used in the extract Flow, most likely JSON.FROM
: a topic name to poll data from.Destination Connection
: a Connection for the Redshift stage.Destination Format
: CSV Format.TO
: a destination table name.
Step 6. All parameters are the same as in the regular Redshift Flow, except the Action
, which can be set to the one of the following:
CDC MERGE
: the system will use the event type stored in the record:c
for insert,u
for update, andd
for delete. This option requires setting theLookup fields
to the comma-separated list of fields that uniquely identify the record in the target database. Optionally, you can enable thePredict Lookup Fields
option. If neither works you can specify the list oftable=fields
pairs in theLookup Field
Use the fully-qualified table names and;
as a separator betweentable=field
pairs.COPY INTO
: in some cases, it makes sense to always insert unmodified CDC events into the staging table. Then load events from the staging table(s) into the final table using SQL. When this option is enabled, the following fields will be automatically added to the staging table:debezium_cdc_op
: event type,c
for create,u
for update, andd
for delete.debezium_cdc_timestamp
: the unique sequence of the event based on timestamp.
Step 7. Schedule load Flow to run in parallel with the extract Flow. Just like with extract Flow, we recommend using a continuous run Schedule type.
Transform and Load data from a message queue into Synapse Analytics
This Flow reads CDC events from the message queue's topics, then transforms and loads data into Synapse Analytics.
Step 1. Create a Kafka or Azure Event Hub Connection. You can reuse the same Connection that you created for the extract Flow.
You must enter a unique name in the Group ID
field.
Also, consider changing the default values for the following parameters:
Number of retries before stop polling
: the number of retries before stop polling if poll returns no records. By increasing this number, you can give Flow more time to read the events from the queue.Max number of records to poll
: the maximum number of records to poll in one call. By changing this number, you can tune the Flow for maximum performance or minimum RAM consumption.Max number of records to read
: the total maximum number of records to read from the queue. Set it to a reasonable number to allow the system to process records in micro-batches. If nothing is entered, the system will read records from the queue until there are no more records. By changing this number you are either increasing or decreasing the size of the micro-batches.
Step 2. Create all required Connections and Formats.
Step 3. Start creating a Flow by typing in queue to azure synapse
.
Step 4. Select Queue to Azure Synapse Analytics
.
Follow the same steps as suggested in this article.
Step 5. Add as many source-to-destination transformations as there are queue topics to poll data from. Most likely you are going to have one topic per source table.
For each transformation set the following parameters:
Source Connection
: queue Connection created in step 1.Source Format
: a Format used in the extract Flow, most likely JSON.FROM
: a topic name to poll data from.Destination Connection
: a Connection for the Synapse Analytics stage.Destination Format
: CSV FormatTO
: a destination table name
Step 6. All parameters are the same as in the regular Synapse Analytics Flow, except the Action
, which can be set to the one of the following:
CDC MERGE
: the system will use the event type stored in the record:c
forINSERT
,u
forUPDATE
andd
forDELETE
. This option requires setting theLookup Fields
to the comma-separated list of fields that uniquely identify the record in the target database. Optionally, you can enable thePredict Lookup Fields
option. If neither works you can specify the list oftable=fields
pairs in theLookup Fields
. Use the fully-qualified table names and ';' as a separator betweentable=fields
pairs.COPY INTO
: in some cases, it makes sense to always insert unmodified CDC events into the staging table. Then load events from the staging table(s) into the final table using SQL. When this option is enabled, the following fields will be automatically added to the staging table:debezium_cdc_op
: event type,c
for create,u
for update, andd
for delete.debezium_cdc_timestamp
: the unique sequence of the event based on timestamp.
Step 7. Schedule load Flow to run in parallel with the extract Flow. Just like with extract Flow, we recommend using a continuous run Schedule type.
Transform and Load data from a message queue into relational database
This Flow reads CDC events from the message queue's topics, then transforms and loads data into the destination database.
Step 1. Create a Kafka or Azure Event Hub Connection. You can reuse the same Connection that you created for the extract Flow.
You must enter a unique name in the Group ID
field.
Also, consider changing the default values for the following parameters:
Number of retries before stop polling
: the number of retries before stop polling if poll returns no records. By increasing this number, you can give the Flow more time to read the events from the queue.Max number of records to poll
: the maximum number of records to poll in one call. By changing this number, you can tune the Flow for maximum performance or minimum RAM consumption.Max number of records to read
: the total maximum number of records to read from the queue. Set it to a reasonable number to allow the system to process records in micro-batches. If nothing is entered, the system will read records from the queue until there are no more records. By changing this number you are either increasing or decreasing the size of the micro-batches.
Step 2. Create a Connection to the destination database.
Step 3. Start creating a Flow by typing in queue to
.
Step 4. Select Queue to database
(assuming that your destination is a relational database).
Step 5. Add as many source-to-destination transformations as there are queue topics to poll data from. Most likely you are going to have one topic per source table.
For each transformation set the following parameters:
-
Source Connection
: queue Connection created in step 1. Source Format
: a Format used in the extract Flow, most likely JSON.FROM
: a topic name to poll data from.Destination Connection
: a destination database Connection created in step 2.TO
: a destination table name- Set the
Action
underMAPPING
/Parameters
: action defines a SQL command to execute. SetAction
to one of the following:Record
: the system will automatically generate INSERT, UPDATE, or DELETE SQL statement based on the value of the debezium_cdc_op field (c, u, d) populate during extract from the CDC-enabled databaseRecord with MERGE
: same as Record, except the system, will generateMERGE
SQL statement for inserts and updates. Note that not all databases support nativeMERGE
, also some databases, for example, MySQL and PostgreSQL, require a unique index for the fields that are used to uniquely identify the record.Record with IfExist
: same asRecord with MERGE
, except it uses aSELECT
statement to check if record exists and then conditionally generates eitherINSERT
orUPDATE
.
- Set the
Lookup Fields
underMAPPING
/Parameters
: a comma-separated list of fields that uniquely identify the record.- Alternatively, you can enable the
Predict Lookup Fields
which, if enabled, will force the Flow to use various algorithms to automatically predict the fields that uniquely identify the record. Note that it is not always possible to correctly detect the unique fields. - When enabling the
Predict Lookup Fields
(which is not always accurate) is not an option you can specify the list oftable=fields
pairs in theLookup Field
. Use the fully-qualified table names and;
as a separator betweentable=fields
pairs.Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
- Alternatively, you can enable the
Step 6. Schedule load Flow to run in parallel with the extract Flow. Just like with extract Flow we recommend using a continuous run Schedule type.
Monitor CDC Flows
Our ETL engine records real-time metrics for each active CDC stream. The following metrics are available:
Last Update(timezone
: the time stamp in the server timezone of the last checkpoint. The metrics are updated every 60 seconds.Last Record Processed (time ago)
: the human-readable timestamp of the last processed CDC event.Last Record Latency
: the last recorded latency. The latency is the difference between a timestamp of the change event in the source database and a timestamp of when the change event was processed by the CDC connector.Max Latency
: the maximum recorded latency since the last restart of the CDC connector.Min Latency
: the minimum recorded latency since the last restart of the CDC connector.Avg Latency
: the average latency since the last restart of the CDC connector.Total Records Processed
: the total number of the change events processed since the last restart of the CDC connector.Records Processed Since Last Update
: the number of the change events processed since the last update. Note that the metrics are updated every 60 seconds.
Access CDC Metrics from UI
To access CDC metrics from UI go to Flows
or Schedules
and click Running
next to the Flow or Schedule name.
Then click View Running Tasks
.
The metrics are displayed at the bottom of the Running Tasks
window.
You can refresh the metrics manually:
or enable Auto refresh
.
Access CDC metrics using API
GET CDC metrics for the specific Flow
This API returns metrics for the specific CDC Flow.
Authentication
Before making a call to API, you must receive a JWT token from the authentication API.
Step 1. Use any user with the Administrator
role to call an Etlworks Integrator authentication endpoint and receive an access token.
Step 2. Use the access token, received in Step 1, to call the Etlworks Integrator API endpoints. The access token must be submitted as a header parameter, as in: Authorization:Bearer access-token
, or query parameter, as in: ?Authorization=Bearer%20access-token
.
Access tokens are short-lived and self-expiring. An access token gives you access to the APIs for approximately 10 minutes. It is recommended that you refresh the access token before each call to the API.
The API endpoint parameters:
- PATH:
/etl/rest/v1/tasks/{audit_id}/?type=stream
EXAMPLE: https://app.etlworks.com/etl/rest/v1/tasks/80610/?type=stream
- METHOD:
GET
- HEADER:
Authorization:Bearer access-token
- REQUEST BODY: none
- REQUEST CONTENT TYPE: none
Audit ID
audit_id
is a required URL parameter. To get the audit_id
for the specific Flow, use the Flow status API.
Response
The response is a JSON document in the following Format. The highlighted are CDC metrics.
[{
"requestId": "string",
"owner": "string",
"name": "string",
"started": timestamp_in_milliseconds_since_epoch,
"duration": number,
"code": "the formatted CDC metrics",
"type": "CDC",
"tenant": "cdc_prod",
"language": null,
"latency": number,
"maxLatency": number,
"minLatency": number,
"avgLatency": number,
"maxLatencyDate": timestamp_in_milliseconds_since_epoch,
"recordsProcessed": number,
"recordsProcessedSinceLastCheck": number,
"lastCheck": timestamp_in_milliseconds_since_epoch,
"lastTimeRecordsReceivedDate": timestamp_in_milliseconds_since_epoch}
]
Response codes
200
for success, 401
for not authorized, 403
for forbidden, and 500
for an internal error.
GET CDC metrics for all running Flows
This API returns metrics for all currently running CDC Flows.
Authentication
Before making a call to API, the user must receive a JWT token from the authentication API.
Step 1. Use any user with the Administrator
role to call an Etlworks Integrator authentication endpoint and receive an access token.
Step 2. Use the access token, received in Step 1, to call the Etlworks Integrator API endpoints. The access token must be submitted as a header parameter, as in: Authorization:Bearer access-token
, or query parameter, as in: ?Authorization=Bearer%20access-token
.
Access tokens are short-lived and self-expiring. An access token gives you access to the APIs for approximately 10 minutes. It is recommended that you refresh the access token before each call to the API.
The API endpoint parameters:
- PATH:
/etl/rest/v1/tasks/?type=stream
EXAMPLE: https://app.etlworks.com/etl/rest/v1/tasks/?type=stream
- METHOD:
GET
- HEADER:
Authorization:Bearer access-token
- REQUEST BODY: none
- REQUEST CONTENT TYPE: none
Response
The response is a JSON document in the following Format. The highlighted are CDC metrics.
[{
"requestId": "string",
"owner": "string",
"name": "string",
"started": timestamp_in_milliseconds_since_epoch,
"duration": number,
"code": "the formatted CDC metrics",
"type": "CDC",
"tenant": "cdc_prod",
"language": null,
"latency": number,
"maxLatency": number,
"minLatency": number,
"avgLatency": number,
"maxLatencyDate": timestamp_in_milliseconds_since_epoch,
"recordsProcessed": number,
"recordsProcessedSinceLastCheck": number,
"lastCheck": timestamp_in_milliseconds_since_epoch,
"lastTimeRecordsReceivedDate": timestamp_in_milliseconds_since_epoch}
]
Response codes
200
for success, 401
for not authorized, 403
for forbidden, and 500
for an internal error.
Comments
0 comments
Please sign in to leave a comment.