- Startup
- Business
- Enterprise
- On-Premise
- Add-on
Overview
Change Data Capture (CDC) is a data integration technique that identifies, captures, and delivers changes made to a source database using the database’s redo (transaction) log. This method enables capturing changes without modifying the application or scanning transactional tables, making it ideal for low-impact, real-time data integration.
Etlworks supports log-based CDC for databases like PostgreSQL, SQL Server, MySQL, Oracle, DB2, MongoDB, and AS400. The engine is powered by a customized version of Debezium.
Read about other change replication techniques available in the Etlworks.
Capabilities
Supported CDC topologies
Cloud, with optional SSH tunnel
Read more about using SSH tunnel to connect to the on-premise database.
Hybrid-cloud with data integration agents
Read more about data integration agents.
On-premise
Read more about deployment options.
Enable change data capture for the source database
Enabling CDC is different for each database. Please use the following tutorials:
CDC connectors
Read about available CDC connectors.
Parameters available for specific database connectors:
CDC pipeline
The end-to-end CDC pipeline extracts data from a CDC-enabled source database and loads it into various supported destinations, including:
- Relational databases
- Data warehouses such as Snowflake, Redshift, Google BigQuery, Azure Synapse, Greenplum, and Vertica
- Cloud storage systems and data lakes
- File storage systems
- Message queues such as Kafka, Google PubSub, Kinesis, and more
Built-in CDC pipelines
Etlworks offers several built-in CDC pipelines, which provide ready-made solutions for extracting and loading data from CDC-enabled source databases into supported destinations. These pre-configured pipelines simplify the process of capturing and replicating data changes, allowing for quick implementation in data integration workflows:
- Stream CDC events, create files in the local or cloud storage or in the data lake
- Stream CDC events into a message queue
- Change Data Capture (CDC) into Snowflake.
- Change Data Capture (CDC) into Amazon Redshift.
- Change Data Capture (CDC) into BigQuery.
- Change Data Capture (CDC) into Synapse Analytics.
- Change Data Capture (CDC) into Vertica.
- Change Data Capture (CDC) into Greenplum.
- Change Data Capture (CDC) into any relational database.
- Change Data Capture (CDC) into any relational database using bulk load.
CDC Extract
Etlworks supports two types of generic CDC flows that can be combined with other flows to capture and replicate changes from a source database to the destination. These flexible flow types allow seamless data extraction and transformation as part of the CDC pipeline.
- Stream CDC events, create files in the local or cloud storage
- Stream CDC events into a message queue
There are also specialized CDC flow types designed for specific destination systems, though the extraction process remains consistent across all flows.
Stream CDC events, create files in the local or cloud storage
In this scenario, CDC events (INSERTS, UPDATES, DELETES) are captured from the transaction log and stored as CSV or JSON files in local or cloud storage. This Flow type eliminates the need for additional infrastructure, such as message queues.
The separate load Flow reads the files, then transforms and loads data into the destination. The load Flow deletes the processed files.
There is no need to create a separate Flow for the initial load. The first time it connects to a CDC-enabled source database, it reads a consistent snapshot of all of the included databases and tables. When that snapshot is complete, the Flow continuously reads the changes that were committed to the transaction log and generates the corresponding insert, update, and delete events.
Create CDC connection and Flow
Step 1. Create a CDC Connection for the source database.
Step 2. Create a CDC events connection if you wish to create CDC events in the local storage. Create one of the following cloud storage connections if you want to create CDC events in the cloud storage or data lake:
Step 3. Create a new Flow using the Flow type Stream CDC events, create files
.
Add source-to-destination transformation
Step 4. Add a single source-to-destination transformation and set the following parameters:
-
Source Connection
: the CDC Connection created in step 1 -
FROM
: the following options are available:-
*
: the system will get the list of the included tables from the CDC Connection - a regular expression to match the table names
- a comma-separated list of fully qualified table names or regular expressions matching the table names.
{token}
-
You can select tables from the list of available tables:
Example with a comma-separated list of included tables:
- CDC key:
[db]_[table]_cdc_stream
-
FROM:
test.inventory
,test.payment
,test.customer
-
The following files will be created in the local storage:
{app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
Example with a regular expression:
- CDC key:
[db]_[table]_cdc_stream
-
Include Table(s):
^[t][e][s][t][.][aA][a-zA-Z\d_\s]+
-
FROM
:*
-
The following files will be created in the local storage:
{app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
{app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
The {app.data}/debezium_data
is a default root location for the CDC events serialized as CSV and JSON files. You can override it by changing the value of the property Location of the CDC events
under the CDC Connection
> Storage
.
The better solution is to override the destination in the transformation (see below).
Optionally set destination connection, override format, and CDC Key
Step 5. Select the connection created in step 3, which can be one of the following:
Optionally set Format, and override the CDC Key in the TO.
The right part of the source-to-destination transformation overrides the following parameters of the CDC connection:
- The
Destination Connection
overrides the storage type, the location of the CDC files, the storage credentials, and whether the files should be gzipped set in the CDC connection under the sectionStorage
. - The
Destination Format
overrides the following parameters set in the CDC connection.
- The
TO
overrides theCDC Key
.
Create JSON files in the local or cloud storage
By default, CDC flow creates CSV files. To create JSON files, follow these steps:
Step 1. For the CDC connection, disable Serialize CDC events as CSV files
and enable Serialize CDC events as JSON files
. Optionally enable Preserve structure of the CDC event when creating JSON file
.
Step 2. If Preserve structure of the CDC event when creating JSON file
is enabled, configure which attributes you want to include in the JSON file. Note that by default, the CDC connector creates a JSON file as a flat array of the CDC events.
Step 3. Create CDC flow and configure source-to-destination transformation where the destination connection is a local or cloud file storage, and the format is JSON.
Schedule CDC flow
Step 6. Schedule the extract Flow. We recommend using a continuous run Schedule type. The idea is that the extract Flow runs indefinitely or until there are no CDC events to extract, or it stops if there is an error. When the Flow stops for any of the reasons above, it restarts automatically after the configured number of seconds.
Stream CDC events to a message queue
In this scenario, the CDC events (INSERTS
, UPDATES
, and DELETES
) are extracted from the transaction log of the source database and sent to the message queue.
Message queues support transactional polling, which makes the entire process very reliable and highly scalable. The only downside is the requirement to have a message queue, which is a separate element of the infrastructure that is not managed by Etlworks.
Here is a list of fully managed message queue services compatible with Kafka consumer and producer APIs:
- Azure Event Hubs: read how to create a Connection for Azure Event Hub.
- Confluent Platform
- CloudKafka: only service with a free forever tier. The free tier is very limited but can be used for testing.
- Aiven for Apache Kafka
- AWS managed Kafka (MSK): note that due to the limitation of the MSK, it is only possible to connect to MSK endpoints from the same VPC. Read more here.
Etlworks also supports streaming CDC events to the following destinations:
There is no need to create a separate Flow for the initial load. The first time it connects to a CDC-enabled source database, it reads a consistent snapshot of all of the included databases. When that snapshot is complete, the Flow continuously reads the changes that were committed to the transaction log and generates the corresponding INSERT
, UPDATE
, and DELETE
events.
Step-by-step process
Step 1. Create a CDC Connection for the source database.
Step 2. Create message queue Connection.
Step 3. Create JSON (recommended) or Avro Format.
Step 4. Start creating a Flow by selecting the Stream CDC event to message queue
from the gallery.
Step 5. Add a single source-to-destination transformation where:
-
Source Connection
: CDC Connection created in step 1. -
FROM
: the following options are available:-
*
: the system will get the list of the included tables from the CDC Connection - a regular expression to match the table names
- a comma-separated list of fully qualified table names or regular expressions matching the table names
{token}
-
You can select tables from the list of the available tables:
- Destination Connection: a message queue Connection created in step 2, for example
- Kafka, Azure Event Hub or Google PubSub.
- Format: JSON or Avro Format created in step 3.
-
TO
: any name, for example,topic
.
Example:
- CDC key:
[db]_[table]_cdc_stream
-
FROM:
test.inventory
,test.payment
,test.customer
The following topics will be created and the Flow will send CDC events to these topics:
test_invemtory_cdc_stream
test_payment_cdc_stream
test_customer_cdc_stream
Step 6. Schedule the extract Flow. We recommend using a continuous run Schedule type. The idea is that the extract Flow runs indefinitely or until there are no CDC events to extract, or it stops if there is an error. When the Flow stops for any of the reasons above, it restarts automatically after the configured number of seconds.
Insert events from all tables into a single topic
If you wish to insert all events from all tables into a single topic in the message queue:
- Hardcode the
CDC Key
. CDC Key = topic name.
- Enable
Update fields for each event
.
Include attributes available in the original CDC event emitted by CDC connector
A typical CDC event emitted by CDC connector contains the following elements:
- before: an optional field that specifies the state of the row before the event occurred.
- after: an optional field that specifies the state of the row after the event occurred.
- source: a mandatory field that describes the source metadata for the event.
- op: a mandatory string that describes the type of operation that caused the connector to generate the event.
- ts_ms: an optional field that displays the time at which the connector processed the event,
- transaction: transaction metadata,
- schema: the schema, which describes the structure of the payload.
- fields: the JDBC schema.
By default, the connector only retains the state of the event after the update and converts it to the flat JSON. The flat JSON is then added to the message queue.
You can change the structure of the event added to the queue by enabling parts of the original event emitted by Debezium by modifying the property Structure
of the CDC event
.
This option is ignored if the connector creates files.
Generate partition key for Kafka and Events Hub
Kafka’s topics are divided into several partitions. While the topic is a logical concept in Kafka, a partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion.
The messages within a partition are ordered; messages across a topic are not guaranteed to be ordered.
The Partition Key
is used to send records to the specific Kafka or Azure Event Hub partition.
If the Partition Key
is not set, the record will be sent to the random partition, so we strongly recommend configuring it.
Example of the partition key: [table]
.
The same connector-specific [tokens] that can be used as a part of the CDC Key
are available for the Partition Key
.
SQL Server tokens
-
[db]
: database name. -
[schema]
: schema name. -
[table]
: table name. -
[op]
: the CDC operation:c
(create),d
(delete),u
(update). -
[timestamp]
: the timestamp of the event in ms. -
[version]
: the connector version. -
[connector]
: the connector class name. -
[name]
: the connector name. -
[ts_ms]
: the timestamp of the event in ms. -
[snapshot]
: true for a snapshot. -
[change_lsn]
: change lns. -
[commit_lsn]
: commit lns. -
[event_serial_no]
: event serial number.
MySQL tokens
-
[db]
: database name. -
[table]
: table name. -
[op]
: the CDC operation:c
(create),d
(delete),u
(update). -
[timestamp]
: the timestamp of the event in ms. -
[version]
: the connector version. -
[connector]
: the connector class name. -
[name]
: the connector name. -
[ts_ms]
: the timestamp of the event in ms. -
[snapshot]
: true for a snapshot. -
[server_id]
: the MySQL server ID. -
[gtid]
: the GTID. -
[file]
: the binlog filename. -
[pos]
: the position in the binlog. -
[row]
: the row number.
Oracle tokens
-
[source]
: fully qualified event source. -
[op]
: the CDC operation:c
(create),d
(delete),u
(update). -
[timestamp]
: the timestamp of the event in ms . -
[version]
: the connector version. -
[connector]
: the connector class name. -
[name]
: the connector name. -
[ts_ms]
: the timestamp of the event in ms. -
[snapshot]
: true for a snapshot. -
[txId]
: the transaction id. -
[scn]
: the scn.
Postgres tokens
-
[db
: database name. -
[schema]
: schema name. -
[table]
: table name. -
[op]
: the CDC operation:c
(create),d
(delete),u
(update). -
[timestamp]
: the timestamp of the event in ms. -
[version]
: the connector version. -
[connector]
: the connector class name. -
[name]
:the connector name. -
[ts_ms]
: the timestamp of the event in ms. -
[snapshot]
:true for a snapshot. -
[txId]
: the transaction ID. -
[lsn]
: the lsn. -
[xmin]
: the xmin.
DB2 tokens
-
[db
: database name. -
[schema]
: schema name. -
[table]
: table name. -
[op]
: the CDC operation:c
(create),d
(delete),u
(update). -
[timestamp]
: the timestamp of the event in ms. -
[version]
: the connector version. -
[connector]
: the connector class name. -
[name]
:the connector name. -
[ts_ms]
: the timestamp of the event in ms. -
[snapshot]
:true for a snapshot. -
[change_lsn].
-
[commit_lsn]
.
MongoDB tokens
-
[db]
: the database name. -
[collection]
: the collection name. -
[op]
: the CDC operation:c
(create),d
(delete),u
(update). -
[timestamp]
: the timestamp of the event in ms. -
[version]
: the connector version. -
[connector]
: the connector class name. -
[name]
: the connector name. -
[ts_ms]
: the timestamp of the event in ms. -
[snapshot]
: true for a snapshot. -
[order]
: the order of the event. -
[h]
: the h of the event.
Stream CDC events from the message queue to any destination
The Flow described in this article streams CDC events stored in a message queue into any supported destination. Read more.
Database-specific use cases
By default, our CDC connectors are configured to support the most common scenarios. Below are some database-specific tips for less common use cases.
MySQL use cases
MySQL Snapshot modes
Snapshot mode specifies the criteria for running a snapshot upon startup of the connector. Read about MySQL snapshot modes.
Add new tables to the pipeline (MySQL)
Make sure the snapshot mode is set to one of the following:
-
ad-hoc initial
. -
ad-hoc schema only
.
If you want to trigger the incremental snapshot of the new table( s) make sure Automatically trigger ad-hoc snapshot for new tables
is enabled. Also, make sure that the database.history.store.only.captured.tables.ddl
is enabled (it is enabled by default).
Add new tables to the pipeline:
- Stop the Flow.
- Add new tables to the to the
FROM
field in the CDC flow. IMPORTANT: the table names must be specified in a fully-qualified format. Adding new tables using regular expressions is not supported. - Restart the Flow.
Capture data from compressed binlog
MySQL CDC connector supports capturing data from the compressed binlog. There is no extra configuration required.
Work with spatial (geometry) data types (MySQL)
By default, our MySQL CDC connector encodes spatial (geometry) data types using WKB format and then uses Base64 encoding on top of that. If you want the connector to convert the geometry data to human-readable JSON, enable Convert Geometry WKB
to JSON
.
Stream CDC events from the specific binlog file, position, and optionally GTID
It is possible to configure the MySQL CDC connector to start streaming from the specific binlog file and position.
It is only available when the connector is switching from the snapshot to streaming mode, and the snapshot mode is set to ad-hoc initial
, ad-hoc schema only
,initial
,schema_only
, always_recover_schema
or schema_only_always_recover_schema
.
Enable cursor-based result set when performing an initial snapshot
By default, the MySQL CDC connector reads data from the large dataset by creating the JDBC statement with the flags ResultSet#TYPE_FORWARD_ONLY and ResultSet#CONCUR_READ_ONLY, and by setting the fetch size hint to Integer#MIN_VALUE.
In some rare cases, it could cause the following error: "No statements may be issued when any streaming result sets are open and in use on a given connection".
database.useCursorFetch
=true
to Other Parameters
:Postgres use cases
Postgres Snapshot modes
Snapshot mode specifies the criteria for running a snapshot upon startup of the connector. Read about Postgres snapshot modes.
Add new tables to the pipeline (Postgres)
If you want to trigger the incremental snapshot of the new table( s) make sure Automatically trigger ad-hoc snapshot for new tables
is enabled.
Add new tables to the pipeline:
- Stop the Flow.
- Add new tables to the
Included Tables
field in the CDC connection or to theFROM
field in the CDC flow. IMPORTANT: the table names must be specified in a fully-qualified format. Adding new tables using regular expressions is not supported. - Restart the Flow.
Avoid abnormal consumption of WAL database disk space
In certain cases, it is possible that PostgreSQL disk space consumed by WAL files either experiences spikes or increases out of usual proportions.
This article suggests a solution for the problem.
Here is an example of the configuration which a) makes a connector to emit an empty heartbeat event every minute, and b) makes a connector to UPSERT a record into the heartbeat table on each heartbeat event.
Propagate default values
If option Propagate default values
is enabled and a default value is specified for a column in the database schema, the PostgreSQL connector will attempt to propagate this value to the stored schema whenever possible.
This option is disabled by default.
SQL Server use cases
SQL Server Snapshot modes
Snapshot mode specifies the criteria for running a snapshot upon startup of the connector. Read about SQL Server snapshot modes.
Add new tables to the pipeline (SQL Server)
Make sure the snapshot mode is set to one of the following:
-
ad-hoc initial
. -
ad-hoc schema only
.
If you want to trigger the incremental snapshot of the new table( s) make sure Automatically trigger ad-hoc snapshot for new tables
is enabled.
Add new tables to the pipeline:
- Stop the Flow.
- Add new tables to the
Included Tables
field in the CDC connection or to theFROM
field in the CDC flow. IMPORTANT: the table names must be specified in a fully-qualified format. Adding new tables using regular expressions is not supported. - Restart the Flow.
Avoid locking when performing a snapshot
To avoid table locking set Snapshot Isolation Mode
to read_uncommitted
.
Work with binary and spatial (geometry) data types (SQL Server)
It is recommended to serialize columns with binary and spatial (geometry) data as hex-encoded strings by setting property binary.handling.mode
to hex
.
Oracle use cases
Connect to read-only Oracle database
To stream data from a read-only database set CDC Adapter
set tologminer
and enable Run connector in read-only mode
. When this option is enabled the connector will not attempt to flush the LGWR buffer to disk, allowing connecting to read-only databases.
Use JDBC URL to configure CDC connection
Due to the variety of URL formats supported by the Oracle JDBC driver, it is recommended to use the fully qualified Oracle JDBC URL to configure the CDC connection. You still need to enter the Host
the Port
and the Database
, also (optionally) the PDB name
.
Oracle Snapshot modes
Snapshot mode specifies the criteria for running a snapshot upon startup of the connector. Read about Oracle snapshot modes.
Improve the performance of the snapshot
When the Oracle CDC connector starts a snapshot, it reads the current system change number (SCN) position from the server’s redo log. It then scans all of the relevant database tables and schemas as valid at the SCN position that was read in the previous step (SELECT * FROM … AS OF SCN 123
) and generates a READ event for each row. Flashback queries (AS OF) can be slow because Oracle needs to 'play' the DML events (insert/update/delete) with reference to a specific point in time.
Enable Disable Flashback Query (AS OF) when creating snapshot
to disable the flashback queries during the snapshot. This could improve the performance of the snapshot in some environments.
Add new tables to the pipeline (Oracle)
Make sure the snapshot mode is set to one of the following:
-
ad-hoc initial
. -
ad-hoc schema only
.
If you want to trigger the incremental snapshot of the new table( s) make sure Automatically trigger ad-hoc snapshot for new tables
is enabled. Also, make sure that the database.history.store.only.captured.tables.ddl
is enabled (it is enabled by default).
Add new tables to the pipeline:
- Stop the Flow.
- Add new tables to the
Included Tables
field in the CDC connection or to theFROM
field in the CDC flow. IMPORTANT: the table names must be specified in a fully-qualified format. Adding new tables using regular expressions is not supported. - Restart the Flow.
Fix OutOfMemory error for long-running and large transactions
Oracle writes all changes to the redo logs in the order in which they occur, including changes that are later discarded by a rollback. As a result, concurrent changes from separate transactions are intertwined. When the connector first reads the stream of changes, because it cannot immediately determine which changes are committed or rolled back, it temporarily stores the change events in an internal buffer. By default, the connector uses the heap memory of the JVM process to allocate and manage buffered event records which can cause OutOfMemory error for long-running and large transactions. The connector can also be configured to use Infinispan as its cache provider, supporting cache stores both locally with embedded mode or remotely on a server cluster. Enable property Enable Embedded Infinispan Cache for Log Mining Buffer
to configure the connector to use an embedded cache store for Log Mining Buffer. This can significantly reduce the memory footprint of the connector.
DB2 use cases
DB2 Server Snapshot modes
Snapshot mode specifies the criteria for running a snapshot upon startup of the connector. Read about DB2 snapshot modes.
Add new tables to the pipeline (DB2)
If you want to trigger the incremental snapshot of the new table( s) make sure Automatically trigger ad-hoc snapshot for new tables
is enabled. Also, make sure that the database.history.store.only.captured.tables.ddl
is enabled (it is enabled by default).
Add new tables to the pipeline:
- Stop the Flow.
- Add new tables to the
Included Tables
field in the CDC connection or to theFROM
field in the CDC flow. IMPORTANT: the table names must be specified in a fully-qualified format. Adding new tables using regular expressions is not supported. - Restart the Flow.
MongoDB use cases
Add new collections to the pipeline
Make sure the snapshot mode is set to one of the following:
-
ad-hoc initial
. -
ad-hoc
.
If you want to trigger the incremental snapshot of the new table( s) make sure Automatically trigger ad-hoc snapshot for new tables
is enabled.
Add new collections to the pipeline:
- Stop the Flow.
- Add new collections to the
FROM
field in the CDC flow. Adding new collections using regular expressions is not supported. - Restart the Flow.
MongoDB Capture Modes
Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment and immediately react to them. Read more about MongoDB Change Streams.
Etlworks MongoDB CDC connector supports 3 capture modes:
-
change_streams_update_full
: enable to capture via MongoDB Change Streams mechanism, update message contains the full message. -
change_streams
: enable to capture via MongoDB Change Streams mechanism; update message does not contain the full message. -
oplog
: enable to capture changes from oplog. This node is deprecated.
Serialize CDC events as JSON documents
When creating a CDC connection for MongoDB it is recommended to configure it to create JSON documents which preserve the original structure of the source document but flatten columns stored using extended JSON (typically ids and timestamp columns).
Step 1. Disable Serialize CDC events as CSV files, enable Serialize CDC events as JSON files and disable Close CSV file if header has changed.
Step 2. Enable all 3 parameters below:
Enabling all 3 parameters will create a JSON array where each record is exactly the same as the record in the source except flattened columns stored using extended JSON. JSON files can be easily loaded as is into the destination database, for example Snowflake, or just stored in the cloud data storage or data lake.
Parallel snapshot
By default CDC connector performs initial and ad-hoc snapshots sequentially, one table at a time, one after another. To enable parallel initial and ad-hoc snapshots, set the property Snapshot Max Threads
for the source CDC connector to a value greater than 1. In a parallel snapshot, the connector processes multiple tables concurrently.
Enabling parallel snapshot can greatly improve the performance of initial load and re-load. It is recommended to set the value of Snapshot Max Threads
to <= number of processing cores x 2.
Add new tables, trigger re-load of the exiting tables
Ad-hoc snapshots
When the initial snapshot is completed, it is not executed again as further data are acquired via streaming.
Sometimes it would be really useful to re-execute a snapshot, either completely or for only a subset of tables. Examples of such use cases include:
-
a new tables are added to the list of captured tables.
- the existing tables need to be re-snapshotted for whatever reason.
When ad-hoc snapshot is triggered the connector temporarily stops streaming, and then initiates a snapshot of the specified tables, following the same process that it uses during an initial snapshot. If parallel snapshotting is enabled the connector processes multiple tables concurrently. After the snapshot completes, the connector resumes streaming.
Add new tables to the pipeline
The ad-hoc snapshot of the new tables requires restarting the CDC extract Flow
Make sure Automatically trigger ad-hoc snapshot for new tables
is enabled.
Make sure the snapshot mode is set to one of the following:
-
ad-hoc initial
. -
ad-hoc
. -
ad-hoc schema only
.
Add new tables and trigger ad-hoc snapshot:
- Stop the Flow.
- Add new tables to the
FROM
field in the CDC flow (or if connector-level settings are used to theIncluded Tables
field in the CDC connection). IMPORTANT: the table names must be specified in a fully-qualified format. Adding new tables using regular expressions is not supported. - Restart the Flow.
Re-load existing tables
When the CDC connection is configured with a Signal Data Collection and the CDC flow is configured with the Signal connection the ad-hoc snapshot of the existing tables does not require restarting the CDC extract Flow.
How ad-hoc snapshot works
It is assumed that there is a configurable data collection that contains the list of database tables to snapshot. We call it a signal collection
. The signal collection
can be one of the following:
- A database table. The database doesn't have to be CDC-enabled, but the Flow should be able to delete records from a signal table.
- A file in any of the supported file storage systems. The flow must be able to delete the file.
The signal collection is used as a container to list all tables which you want to re-snapshot. It is not the actual table to re-snapshot.
The CDC flow monitors the signal collection
. When changes are detected, the Flow reads the list of tables in the signal collection
and triggers the ad-hoc snapshot for the tables listed in the data collection.
The flow immediately deletes processed data in signal collection
:
- If the
signal collection
is a database table, the Flow deletes all records from that table. - If the
signal collection
is a file, the Flow deletes the file.
The format of the signal
The signal collection
table or file must contain a single column that can have any name.
The tables to snapshot can be stored:
a) Horizontally - multiple comma-separated tables in a single row, for example test.inventory
, test.payment
, test.customer
or
b) Vertically - multiple rows with table names; each row can contain one or multiple comma-separated tables.
Configure connections and flow for ad-hoc snapshot
In Connections
Step 1. Create file storage or database connection. The connection must be read/write.
Step 2. For the file, storage connections create a format.
In CDC Flow.
Step 3. Add the connection and format created in steps 1-2 under the Connection tab of the CDC Flow.
In CDC connection.
Step 4. Make sure the snapshot mode is set to one of the following:
-
ad-hoc initial
. -
ad-hoc
. -
ad-hoc schema only
.
Step 5. Configure the name of the Signal data collection
.
If the signal collection
is a database table enter a fully qualified table name.
If the signal collection
is a file enter the file name with extension but without the path.
Trigger add-hoc snapshot of the exiting tables
To trigger the ad-hoc snapshot for the tables listed in the signal collection
, insert records into the signal table or create a signal file. The ad-hoc snapshot of the existing tables does not require restarting the CDC extract Flow.
Incremental snapshots
Initial and ad-hoc snapshots are a great way to obtain a consistent set of all data stored in a database upfront. But there are also some downsides to this approach, though:
-
For larger tables, it can take very long to complete, hours or even days.
-
The snapshot must be completed before the start of streaming.
-
Snapshots are not resumable If a snapshot hasn’t been completed when the connector gets stopped or restarted, the snapshot must be restarted from scratch.
When to use incremental snapshots
To mitigate these issues, a mechanism called incremental snapshotting has been introduced. The table is snapshotted in chunks, and streaming and snapshot events are blended together.
Setup incremental snapshot
Read about setting up incremental snapshot.
When not to use incremental snapshots
Incremental snapshot is not the same as ad-hoc snapshot. The setup is much more complex, requires multiple moving parts, the source database must be read-write and the performance is worse than the performance of the ad-hoc snapshot.
Do not use incremental snapshot:
- When the table does not have a primary key.
- When the CDC database is read-only. Incremental snapshot mechanism requires connector to write and read signals from the same database it steams data from.
- When you do not want to deal with a complex setup.
- When the speed of the snapshot is important. Incremental snapshot reads data from the source table in small chunks. It also (unlike initial and ad-hoc snapshot) does not support parallel snapshotting of multiple tables.
Tips and tricks
Configure data exchange formats
Most CDC flows (except direct CDC to database and CDC to message queue) create either CSV or JSON files (or both), then load them into the designated database or cloud data storage/data lake. There are several configuration options available for the CDC connection which control the format of the created files.
Convert NULL values in the source database into empty ('') strings in CSV
To convert NULL values in the source database into empty ('') strings in CSV enable Convert null to empty
for the source CDC connection.
Do not enclose null values in double quotes
By default, CDC connector encloses all values (including nulls) into the double quotes when it creates CSV files. Some databases, for exmaple Greenplum cannot correctly load empty strings ("") into nullable columns.
To stop enclosing null values in double quotes enable Do not enclose null in double quotes
for the CDC connection.
Convert boolean true/false into numeric 1/0
Some databases do not have native boolean datatype and use numeric columns to store boolean values as 1 or 0.
To convert boolean (true/false) values in the source database into numerical 1/0 enable Convert boolean true/false into numerical 1/0
for the CDC connection.
Preserve end-of-line (EOL) characters in column's value
Some databases do not allow loading CSV files when column's value includes EOL characters, so the CDC connection automatically removes EOLs.
To preserve EOLs disable Remove EOL characres
for the CDC connection.
Create CSV and JSON files using the specified character encoding
If the source database has tables with data encoded using any character set other than ASCII (for example UTF-8
), you can configure the connector to create files using that specific encoding.
To preserve the encoding when loading files into the databases, you will need to set the same character encoding for the CSV Format.
If the encoding is not specified or set to no encoding
, the JSON files are created with UTF-8
encoding.
Handle NULL and empty ('') values when loading CSV files into databases
Most databases and cloud data warehouses allow batch-loading CSV files created by CDC flow. Databases handle loading NULL values differently.
NULL is the same as empty ('')
Step 1. When creating a CDC Connection, enable Convert null to empty
. Note that it is disabled by default.
Step 2. When creating a CVS Format for FROM
and TO
:
- Disable
Convert 'null' to null
. - Enable
Convert empty string to null
. - Set
Value for null
empty.
Loading empty values as NULL into Snowflake
When creating a Flow to load data into Snowflake, set String used to covert to and from SQL NULL
to ('NULL','null','')
.
Loading empty values as NULL into Redshift
When creating a Flow to load data into Redshift, enable Load empty chars as null
.
Loading empty values as NULL into Greenplum
For CDC connection, enable Do not enclose NULL in double quotes
.
NULL is not the same as empty ('')
Step 1. When creating a CDC Connection, make sure Convert null to empty
is disabled. Note that it is disabled by default.
Step 2. When creating a CVS Format for FROM
and TO
:
- Enable
Convert 'null' to null
. - Disable
Convert empty string to null
. - Set
Value for null
to'null'
.
Loading not empty values as NULL into Snowflake
When creating a Flow to load files into Snowflake, set String used to convert to and from SQL NULL
to ('NULL','null')
.
Working with JSON columns in the source database
When the source database stores data in the nested data structures (for example MongoDB) or a record includes column(s) with nested data elements serialized as JSONs (for example JSON columns in MySQL) the CDC connector can be configured to stringify JSON columns.
To convert JSON columns in the source to strings when creating CSV or/and JSON files enable all 3 parameters below for the CDC connection.
JSON columns when CDC connector creates JSON files
Enabling all 3 options will create a JSON array where each record is exacly the same as the record in the source. JSON files can be easily loaded as is into the destination database, for example Snowflake, or just stored in the cloud data storage or data lake.
JSON columns when CDC connector creates CSV files
Assuming that the source table has JSON columns, the values of the columns will be stringified preserving the original structure of the JSON columns. Stringified JSON columns in the CSV file can be easily loaded into the JSON columns in the destination database, for example Snowflake.
Handle source table DDL changes
If you are using a Flow that stores CDC events as CSV files and are expecting that the structure of the source table can be modified when the Flow is still streaming data from this table when creating the CDC Connection, enable Close CSV file if header has changed
.
Capture source database schema
Etlworks can automatically create destination tables when loading CDC messages.
Step 1. When creating the CDC Connection, enable Save Metadata
.
Step 2. When creating a CVS Format for FROM
and TO
.
- Enable
Save Metadata
. -
Keep
All fields are strings
disabled.
In addition, when enabled, this feature allows the CDC load Flow to automatically set the Lookup Fields
required for the CDC MERGE
by extracting the information about the primary keys from the source database. It significantly enhances the accuracy of the Predict Lookup Fields
feature. Basically makes it bulletproof and very low cost.
If both options are disabled, the Flow will sample the file and set the data types of the columns in the destination table. This mechanism is not 100% accurate so the destination table might end up with columns that have different data types compared to the source table.
Capture NOT NULL constraints
When Save Metadata
is enabled, the CDC connector captures the following attributes of the source columns: name, SQL data type, JDBC type, and size. It does not capture the NOT NULL
constraints by default. To configure the connector to capture NOT NULL
constraints enable this configuration option:
Create all columns as strings (VARCHAR)
Etlworks can automatically create a destination table when loading CSV files. If you wish to create all columns as strings (VARCHAR
), then:
Step 1. When creating the CDC Connection disable Save Metadata
.
Step 2. When creating a CVS Format for FROM
and TO
:
- Disable
Save Metadata
. - Enable
All fields are strings
.
Configure the frequency of logging
By default, CDC connector logs processed CDC events using an exponential elapsed time strategy: the time period between logging starts at every 5 seconds and increases exponentially until it reaches a maximum of 1 hour.
You can configure the maximum time interval by adding the property debezium.max.log.poll.period
to Other parameters. This property sets the maximum interval in minutes. The minimum allowed value is 1 minute.
Implement soft deletes
By default, Load Flow creates, updates, and deletes records in the destination table based on the type of the CDC event: c
for create, u
for update, and d
for delete.
If you need to implement the soft delete - add the Extra columns
with the specific function to the CDC Connection. The following functions are available:
-
cdc_boolean_soft_delete
: the boolean column with a user-defined name will be added to the CDC stream. Values returned by the function:true
for delete,null
for all other events. -
cdc_boolean_soft_delete(true)
: the boolean column with a user-defined name will be added to the CDC stream. Values returned by the function:true
for delete,false
for all other events. -
cdc_timestamp_soft_delete(yyyy-MM-dd HH:mm:ss.SSS)
: the timestamp column with a user-defined name will be added to the CDC stream. Values returned by the function:timestamp
for delete,null
for all other events.
In this example, the column delete_at
will be added to the CDC stream and its value will be set to the current timestamp with a milliseconds precision.
If any of the functions above are used the CDC event type is automatically switched to u
(update).
Add columns to the CDC stream
By default, the CDC stream contains all columns in the source table. You can add columns with user-defined names to the stream using the Extra columns
property of the CDC Connection.
Format of the property: column_name[=function[(parameter)],column_name[=function[(parameter)]
Available functions:
-
cdc_op
: the CDC event type:c
for create,u
for update, andd
for delete -
cdc_key
: the CDC key in the user-defined Format -
cdc_timestamp(yyyy-MM-dd HH:mm:ss.SSS)
: the timestamp of the CDC event. It is not guaranteed that this value will be unique for each event in the sequence of events. -
cdc_event_seq
: the unique 'number' of the event. This number grows sequentially. -
cdc_boolean_soft_delete
: the boolean column:true
for delete,null
for all other events -
cdc_boolean_soft_delete(true)
: the boolean column:true
for delete,false
for all other events -
cdc_timestamp_soft_delete(yyyy-MM-dd HH:mm:ss.SSS)
: the timestamp column:timestamp
for delete,null
for all other events. -
data conversion functions
: set the data type of the column to one of the following:-
-
integer
: integer. Example:integer(abc)
. -
numeric
: numeric. Example:numeric(abc)
. -
long
: long. Example:long(abc)
. -
bit
: bit. Example:bit(snapshot)
. -
double
:double. Example:double(abc)
. -
float
: float. Example:float(abc)
. -
varchar
: varchar. Example:varchar(snapshot)
. -
boolean
: boolean. Example:boolean(snapshot)
. -
timestamp
: timestamp. Example:timestamp(ts_ms)
. -
time
: time. Example:time(ts_ms)
. -
date
: date. Example:date(ts_ms)
.
-
-
-
any hardcoded value, including null
: hardcoded column value. Example:created_at=null
. Can be used with data conversion function (see above). -
any attribute of the node source
: each CDC connector has it's own set of attributes. Example:created_at=ts_ms
. Can be used with data conversion function (see above). Here are the attributes available for all connectors:-
-
ts_ms
: timestamp in milliseconds of the event originated in the source database. -
snapshot
: is snapshot running. -
name
: name of the connector. -
version
: version of the connector. -
connector
: type of the connector. -
db
: database name. -
table
orcollection
: table or collection name. -
schema
: schema name is applicable.
-
-
Change CDC key and CDC event
The CDC event represents a row (a record) in the CDC stream. The CDC key represents the unique identifier of the group of events. All events with the same CDC key are stored in the same file.
Use Preprocessor
to change the CDC event and/or CDC key.
Available scripting languages
- JavaScript
Available variables
- event: the CDC event serialized as JsonNode.
- key: CDC key.
- source: CDC source serialized as JsonNode.
- transactionId: the transaction id associated with the event if transaction markers are enabled.
Get and set the values of the column in the event
To get the value of the column, serialized as a string:
var val = event.get('columns_name').asText();
To set the value:
event.put('columns_name', string);
Possible return values
-
true
/false
: if false the event will be skipped. TypedKeyValue<Boolean, String>
: the key is an equivalent of true/false above, the value is CDC key to replace the automatically calculated CDC key.
Example:
var newCdcKey= 'abc';
value = new TypedKeyValue(true, newCdcKey);
Transaction markers
Consider, for instance, an e-commerce application that manages purchase orders. In a relational database, the information representing such order will typically be spread across multiple tables, e.g. PURCHASE_ORDER
, ORDER_LINE
, and SHIPMENT_ADDRESS
. To examine the complete state of a purchase order in the database itself, you’d run a JOIN
query against the three tables.
Things get more challenging when looking at the change events produced by CDC connectors.
The situation can be addressed by leveraging the new transaction metadata supported by most CDC connectors. When enabled, separate transaction markers will be created at the beginning and end of each transaction.
To enable transaction markers, modify the CDC Connection by enabling the Provide Transaction Metadata
parameter. Optionally enable Log Transaction Metadata
to add metadata (such as names of the tables involved in the transaction, as well as the number of affected records for each table) to the flow log. Optionally enable Log Transaction Start/End
to include transaction start/end events in the log.
How to stop CDC Flow
By default, the CDC flow never stops automatically. It is a recommended configuration.
You can configure the CDC Connection to stop when there are no more new CDC events for an extended period of time. The CDC Flow will stop if any of the above reaches the configured threshold.
-
Number of retries before giving up
: the number of retries before CDC flow stops if there are new CDC events. -
Retry N minutes before giving up
: the number of minutes to retry before CDC flow stops if there are new CDC events. -
Always stop after N minutes
: the number of minutes to run the connector before stopping. The connector will stop even if there are new records in the queue.
Configure automatic stop with caution. It takes time to start streaming the CDC events, so stopping the flow prematurely could lead to a situation when no CDC events are streamed.
If none of the parameters are set (default), the CDC flow will run forever until manually stopped or there is an error.
To stop CDC Flow manually, click Stop
/ Cancel
.
Manage CDC Flows
Reset CDC Flow
Etlworks CDC connectors store the history of DDL changes for the monitored database in the history file and the current position in the transaction log in the offset file.
Typical CDC extract flow starts by snapshotting the monitored tables (A) or starts from the oldest known position in the transaction (redo) log (B), then proceeds to stream changes in the source database (C). If the Flow is stopped and restarted, it resumes from the last recorded position in the transaction log.
Occasionally it is required to "reset" the flow to restart the process from scratch (A) or (B).
Process
Step 1. Create a new Offset and History connection.
Step 2. Stop CDC flow if it is running.
Step 3. Open the connection created in step 1 in Explorer and delete .dat
file(s) associated with the CDC connection.
Step 4. Start CDC flow. Depending on the configuration, the Flow will either perform the full snapshot or will start from the oldest position in the transaction log.
Backup history and offset files
It is possible to setup an automatic backup of the offset and history files into the designated storage and folder (by default {app.data}/debezium_data/backup
) . The backup files are created as 'name_timestamp.zip', where 'name' is a name of the connection and 'timestamp' is a current timestamp formatted using user-configurable format string. The backup file includes both history and offset files.
Configuring backup
In CDC connection navigate to the section Backup offset
.
Set the following parameters:
Automatically backup offset every (minutes)
: if the value of this field is a positive integer the connector will automatically create a backup of the offset and history files every n (configurable) minutes.Delete offset backups which are older than (minutes)
: if the value of this field is a positive integer the connector will automatically delete backups of the offset and history files which are older than n (configurable) minutesBackup file timestamp format
: the backup files are created as 'name_timestamp.zip', where 'name' is a name of the connection and 'timestamp' is a current timestamp formatted using user-configurable format string. The default format is MMddyyyyHHmmss. Available tokens are: MM - month in year, dd - day in month, yyyy - year, HH - hours in day (0-23), mm - minutes, ss - seconds, SSS - milliseconds. Check out this article for other options.
Changing destination for backup
By default the backup files are created under {app.data}/debezium_data/backup
folder. You can change the storage and the folder under Flow->Connections tab->Offset Backup.
Monitor CDC Flows
Our ETL engine records real-time metrics for each active CDC stream. The following metrics are available:
-
Last Update(timezone
: the time stamp in the server timezone of the last checkpoint. The metrics are updated every 60 seconds. -
Last Record Processed (time ago)
: the human-readable timestamp of the last processed CDC event. -
Messages/sec
: the number of messages processed in one second recorded during the last checkpoint interval. -
Last Record Latency
: the last recorded latency. The latency is the difference between a timestamp of the change event in the source database and a timestamp of when the change event was processed by the CDC connector. -
Max Latency
: the maximum recorded latency since the last restart of the CDC connector. -
Min Latency
: the minimum recorded latency since the last restart of the CDC connector. -
Avg Latency
: the average latency since the last restart of the CDC connector. -
Total Records Processed
: the total number of the change events processed since the last restart of the CDC connector. -
Records Processed Since Last Update
: the number of the change events processed since the last update. Note that the metrics are updated every 60 seconds.
Access CDC Metrics from UI
To access CDC metrics from UI go to Flows
or Schedules
and click Running
next to the Flow or Schedule name.
Then click View Running Tasks
.
The metrics are displayed at the bottom of the Running Tasks
window.
You can refresh the metrics manually:
or enable Auto refresh
.
Access CDC metrics using API
GET CDC metrics for the specific Flow
This API returns metrics for the specific CDC Flow.
Authentication
Before making a call to API, you must receive a JWT token from the authentication API.
Step 1. Use any user with the Administrator
role to call Etlworks authentication endpoint and receive an access token.
Step 2. Use the access token, received in Step 1, to call Etlworks API endpoints. The access token must be submitted as a header parameter, as in: Authorization:Bearer access-token
.
Access tokens are short-lived and self-expiring. An access token gives you access to the APIs for approximately 10 minutes. It is recommended that you refresh the access token before each call to the API.
The API endpoint parameters:
-
PATH:
/etl/rest/v1/tasks/{audit_id}/?type=CDC%20stream
EXAMPLE: https://app.etlworks.com/etl/rest/v1/tasks/80610/?type=CDC%20stream
-
METHOD:
GET
-
HEADER:
Authorization:Bearer access-token
- REQUEST BODY: none
- REQUEST CONTENT TYPE: none
Audit ID
audit_id
is a required URL parameter. To get the audit_id
for the specific Flow, use the Flow status API.
Response
The response is a JSON document in the following Format. The highlighted are CDC metrics.
[{
"requestId": "string",
"owner": "string",
"name": "string",
"started": timestamp_in_milliseconds_since_epoch,
"duration": number,
"code": "the formatted CDC metrics",
"type": "CDC",
"tenant": "cdc_prod",
"language": null,
"messagesPerSecond": messages_per_second,
"latency": number,
"maxLatency": number,
"minLatency": number,
"avgLatency": number,
"maxLatencyDate": timestamp_in_milliseconds_since_epoch,
"recordsProcessed": number,
"recordsProcessedSinceLastCheck": number,
"lastCheck": timestamp_in_milliseconds_since_epoch,
"lastTimeRecordsReceivedDate": timestamp_in_milliseconds_since_epoch}
]
Response codes
200
for success, 401
for not authorized, 403
for forbidden, and 500
for an internal error.
GET CDC metrics for all running Flows
This API returns metrics for all currently running CDC Flows.
Authentication
Before making a call to API, the user must receive a JWT token from the authentication API.
Step 1. Use any user with the Administrator
role to call Etlworks authentication endpoint and receive an access token.
Step 2. Use the access token, received in Step 1, to call Etlworks API endpoints. The access token must be submitted as a header parameter, as in: Authorization:Bearer access-token
.
Access tokens are short-lived and self-expiring. An access token gives you access to the APIs for approximately 10 minutes. It is recommended that you refresh the access token before each call to the API.
The API endpoint parameters:
-
PATH:
/etl/rest/v1/tasks/?type=CDC%20stream
EXAMPLE: https://app.etlworks.com/etl/rest/v1/tasks/?type=CDC%20stream
-
METHOD:
GET
-
HEADER:
Authorization:Bearer access-token
- REQUEST BODY: none
- REQUEST CONTENT TYPE: none
Response
The response is a JSON document in the following Format. The highlighted are CDC metrics.
[{
"requestId": "string",
"owner": "string",
"name": "string",
"started": timestamp_in_milliseconds_since_epoch,
"duration": number,
"code": "the formatted CDC metrics",
"type": "CDC",
"tenant": "cdc_prod",
"language": null,
"messagesPerSecond": messages_per_second,
"latency": number,
"maxLatency": number,
"minLatency": number,
"avgLatency": number,
"maxLatencyDate": timestamp_in_milliseconds_since_epoch,
"recordsProcessed": number,
"recordsProcessedSinceLastCheck": number,
"lastCheck": timestamp_in_milliseconds_since_epoch,
"lastTimeRecordsReceivedDate": timestamp_in_milliseconds_since_epoch}
]
Response codes
200
for success, 401
for not authorized, 403
for forbidden, and 500
for an internal error.
Comments
0 comments
Please sign in to leave a comment.