This tutorial demonstrates how to implement [near] real-time CDC-based change replication for the most popular databases using the following technologies:
- Native CDC for each source database.
- Apache Kafka.
- Etlworks Kafka connector with built-in support for Debezium.
Etlworks Integrator parses the CDC events emitted to the Kafka topic, automatically transforms events to the DML SQL statements (INSERT/UPDATE/DELETE) and executes SQL statements in the target database in the order they were created. It also handles the collisions and errors, ensuring that the solution is 100% reliable.
Anything can be a destination: SQL and NoSQL data sources, online data warehouses such as Snowflake and Redshift, files, API endpoints, etc. The following databases are supported as a source:
- Microsoft SQL Server
Read about other change replication techniques available in Etlworks Integrator.
The solution requires installing and configuring Apache Kafka, Kafka Connect, and Debezium.
Installing and configuring Kafka and Kafka connect (Kafka component required for CDC) is not part of this tutorial. In most cases installing Kafka is as easy as downloading the latest version of the standalone or dockerized Kafka and Zookeeper. Kafka Connect is typically included in all distribution packages so there is nothing to install. Debezium can be installed as a plugin for Kafka Connect by simply copying required libraries to the KAFKA_HOME/plugin folder.
For customers on Enterprise plans, Etlworks installs all required components. We provide assistance with installing components to the self-hosted customers.
Please contact Etlworks support if you want to enable a real-time change replication for your account.
The basic setup for real-time CDC-based change replication in Etlworks is very easy:
- Setup CDC for the source database.
- Configure Debezium to capture CDC events and publish them to the Kafka topic(s).
- Create a change replication flow where the source is a Kafka topic and the destination is a target database table.
- Schedule the flow.
Setup CDC for the source database
Enabling CDC is different for each database. Please use the following tutorials:
Configure Debezium to capture CDC events and publish them to the Kafka topic(s)
Assuming that the Debezium is already installed as a Kafka Connect plugin and up and running, we will be configuring a connector to the source database using Kafka Connect REST API. In this tutorial, we will be using Microsoft SQL Server. Configuring connectors to other databases is equally simple.
The Debezium connectors are created using Kafka Connect REST API so make sure either curl or Postman is installed in your development box. In this tutorial, we will be using Postman.
Step 1. Verify that Kafka Connect is installed and running.
The default port for Kafka Connect API is 8083. Assuming that it runs on localhost, the URL for the API endpoint which returns configured connectors is:
Step 2. Create a new connector for Microsoft SQL Server.
ENDPOINT URL: http://localhost:8083/connectors
"table.whitelist": "comma separated list of fully_qualified_table_names",
CDC Event Flattening
The payload contains two important sections required for integrating Etlworks Kafka connector with Debezium:
1. The unwap transfromation, which creates a flat version of the CDC event:
2. The settings which remove the schema information from the CDC event:
The naming convention for Kafka topics
Debezium stores CDC events in a separate topic for each table. If the connector was configured using the following parameters:
the CDC events for the table
cdc_test will be stored in a Kafka topic
Examples of the generated CDC events
Etlworks automatically parses CDC events stored in a Kafka topic so you don't have to deal with this, but it is still a good idea to learn how exactly the generated events look like for the different DML statements.
Assuming that we are using a JSON format for serialization of the CDC events (default), also assuming that the source table was created using the following SQL:
CREATE TABLE dbo.cdc_test(id INT,
PRIMARY KEY (id))
the generated CDC events will look like the following:
INSERT INTO dbo.test2 (id,name,changed) values (1,'test1',CURRENT_TIMESTAMP)
UPDATE dbo.cdc_test SET name = 'updated-test1-1', changed=CURRENT_TIMESTAMP
WHERE id = 1
DELETE FROM WHERE id = 1
Configuring serialization format for DATE/TIME fields
As you probably noticed, the value of the timestamp field changed is generated as Unix epoch time in milliseconds. You can convert it to the human (and database) readable format using TimestampConverter transformation:
Create a change replication flow
Step 1. Create source Kafka connection. When creating a connection select Debezium as CDC provider. Enter a wildcard topic name in the format database_server_name.database_name.*.
Step 2. Create a JSON format with all default settings.
Step 3. Create a connection to the destination database. Disable the auto-commit.
Step 4. Create a new flow by selecting queue to database from the gallery.
Step 5. Add a new source-to-destination transformation where:
- the FROM connection is the connection to Kafka created in step 1.
- the FROM format is the JSON format created in step 2.
- the FROM is a topic name for the specific table.
- the TO is a destination table name.
- the TO connection is the destination database connection created in step 3.
Step 6. Click the MAPPING button, select Parameters tab and configure the following parameters:
- Stream Data - enabled
- Use Bind Variables - enabled
- Create Tables and Modify DML to include only fields which exist in the destination - enabled
- Action - Record
- Lookup Fields - the unique field(s) for the record. In our example the unique field is ID.
Step 7. If the foreign constraints are disabled (or do not exist) in the destination database, you can enable processing of each transformation in a parallel thread.
Step 8. Add the source-to-destination transformations for all tables with enabled CDC. You can duplicate the existing transformation and change the topic name (the format is server_name.database_name.table_name), the destination table name and the lookup field(s).