This tutorial demonstrates how to implement [near] real-time CDC-based change replication for the most popular databases using the following technologies:
- Native CDC for each source database
- Apache Kafka
- Etlworks Kafka connector with built-in support for Debezium
In this article, we assume that the Debezium and Kafka run outside of the Etlworks Integrator infrastructure. Note that the Etlworks Integrator includes a heavily modified Debezium engine built on top of the latest and greatest version of Debezium, and we do have built-in CDC Flows, optimized for performance and scalability. These Flows do not require any additional elements of the infrastructure and are as fast and reliable as anything Debezium deployed as a Kafka Connect plugin or as an independent Debezium server can offer.
For all intents and purposes, we recommend using our built-in CDC Flows. Read more in our knowledge base.
The Etlworks Integrator parses the CDC events emitted to the Kafka topic, automatically transforms events to the DML SQL statements (
DELETE), and executes SQL statements in the target database in the order they were created. It also handles collisions and errors, ensuring that the solution is 100% reliable.
Anything can be a destination: SQL and NoSQL data sources, online data-warehouses such as Snowflake and Redshift, files, API endpoints, etc. The following databases are supported as a source:
- Microsoft SQL Server
Read about other change replication techniques available in the Etlworks Integrator.
The solution requires installing and configuring Apache Kafka, Kafka Connect, and Debezium.
Installing and configuring Kafka and Kafka Connect (Kafka component required for CDC) is not part of this tutorial. In most cases installing Kafka is as easy as downloading the latest version of the standalone or dockerized Kafka and Zookeeper. Kafka Connect is typically included in all distribution packages so there is nothing to install. Debezium can be installed as a plugin for Kafka Connect by simply copying required libraries to the
KAFKA_HOME / plugin folder.
For customers on Enterprise plans, the Etlworks Integrator installs all required components. We provide assistance with installing components to self-hosted customers.
Please contact Etlworks support if you want to enable a real-time change replication for your account.
The basic setup for real-time CDC-based change replication in the Etlworks Integrator is very easy:
- Setup CDC for the source database.
- Configure Debezium to capture CDC events and publish them to the Kafka topic(s).
- Create a change replication Flow where the source is a Kafka topic and the destination is a target database table.
- Schedule the Flow.
Setup CDC for the source database
Enabling CDC is different for each database. Please use the following tutorials:
Configure Debezium to capture CDC events and publish them to the Kafka topic(s)
Assuming that the Debezium is already installed as a Kafka Connect plugin and up and running, we will be configuring a connector to the source database using Kafka Connect REST API. In this tutorial, we will be using Microsoft SQL Server. Configuring connectors to other databases is equally simple.
The Debezium connectors are created using Kafka Connect REST API so make sure either curl or Postman is installed in your development box. In this tutorial, we will be using Postman.
Step 1. Verify that Kafka Connect is installed and running.
The default port for Kafka Connect API is
8083. Assuming that it runs on localhost, the URL for the API endpoint which returns configured connectors is:
Step 2. Create a new connector for Microsoft SQL Server.
ENDPOINT URL: http://localhost:8083/connectors
"table.whitelist": "comma separated list of fully_qualified_table_names",
CDC Event Flattening
The payload contains two important sections required for integrating Etlworks Kafka connector with Debezium:
- The unwrap transformation, which creates a flat version of the CDC event:
2. The settings which remove the schema information from the CDC event:
The naming convention for Kafka topics
Debezium stores CDC events in a separate topic for each table. If the connector was configured using the following parameters:
The CDC events for the table
cdc_test will be stored in a Kafka topic
Examples of the generated CDC events
The Etlworks Integrator automatically parses CDC events stored in a Kafka topic so you don't have to deal with this, but it is still a good idea to learn how exactly the generated events look like for the different DML statements.
Assuming that we are using a JSON Format for serialization of the CDC events (default) and assuming that the source table was created using the following SQL:
CREATE TABLE dbo.cdc_test(id INT,
PRIMARY KEY (id))
The generated CDC events will look like the following:
INSERT INTO dbo.test2 (id,name,changed) values (1,'test1',CURRENT_TIMESTAMP)
UPDATE dbo.cdc_test SET name = 'updated-test1-1', changed=CURRENT_TIMESTAMP
WHERE id = 1
DELETE FROM WHERE id = 1
Configure serialization Format for DATE/TIME fields
As you probably noticed, the value of the timestamp field
changed is generated as Unix epoch time in milliseconds. You can convert it to the human (and database) readable Format using TimestampConverter transformation:
Create a change replication Flow
Step 1. Create source Kafka Connection. When creating a Connection, select
Debezium as CDC provider. Enter a wildcard topic name in the Format
Step 2. Create a JSON Format with all default settings.
Step 3. Create a Connection to the destination database. Disable the
Step 4. Create a new Flow by selecting
queue to database from the gallery.
Step 5. Add a new source-to-destination transformation where:
FROMConnection is the Connection to Kafka created in step 1.
FROMFormat is the JSON Format created in step 2.
FROMis a topic name for the specific table.
TOis a destination table name.
TOConnection is the destination database Connection created in step 3.
Step 6. Click
Mappings, select the
Parameters tab and configure the following
Stream Data: enabled
Use Bind Variables: enabled
Create Tables and Modify DML to include only fields which exist in the destination: enabled
Lookup Fields: the unique field(s) for the record. In our example the unique field is
Step 7. If the foreign constraints are disabled (or do not exist) in the destination database, you can enable the processing of each transformation in a
Step 8. Add the source-to-destination transformations for all tables with enabled CDC. You can duplicate the existing transformation and change the topic name (the Format is
server_name.database_name.table_name), the destination table name, and the