- Starter
- Business
- Enterprise
- On-Premise
- Add-on
Overview
This tutorial demonstrates how to implement [near] real-time CDC-based change replication for the most popular databases using the following technologies:
- Native CDC for each source database
- Apache Kafka
- Debezium
- Etlworks Kafka connector with built-in support for Debezium
In this article, we assume that the Debezium and Kafka run outside of the Etlworks Integrator infrastructure. Note that the Etlworks Integrator includes a heavily modified Debezium engine built on top of the latest and greatest version of Debezium, and we do have built-in CDC Flows, optimized for performance and scalability. These Flows do not require any additional elements of the infrastructure and are as fast and reliable as anything Debezium deployed as a Kafka Connect plugin or as an independent Debezium server can offer.
For all intents and purposes, we recommend using our built-in CDC Flows. Read more.
The Etlworks Integrator parses the CDC events emitted to the Kafka topic, automatically transforms events to the DML SQL statements (INSERT
/ UPDATE
/ DELETE
), and executes SQL statements in the target database in the order they were created. It also handles collisions and errors, ensuring that the solution is 100% reliable.
Anything can be a destination: SQL and NoSQL data sources, online data-warehouses such as Snowflake and Redshift, files, API endpoints, etc. The following databases are supported as a source:
- Microsoft SQL Server
- MongoDB
- MySQL
- Oracle
- PostgreSQL
Read about other change replication techniques available in the Etlworks Integrator.
Prerequisites
The solution requires installing and configuring Apache Kafka, Kafka Connect, and Debezium.
Installing and configuring Kafka and Kafka Connect (Kafka component required for CDC) is not part of this tutorial. In most cases installing Kafka is as easy as downloading the latest version of the standalone or dockerized Kafka and Zookeeper. Kafka Connect is typically included in all distribution packages so there is nothing to install. Debezium can be installed as a plugin for Kafka Connect by simply copying required libraries to the KAFKA_HOME
/ plugin folder.
For customers on Enterprise plans, the Etlworks Integrator installs all required components. We provide assistance with installing components to self-hosted customers.
Please contact Etlworks support if you want to enable a real-time change replication for your account.
Solution
The CDC events are serialized as JSON or Avro documents and can be transformed using any of the available Etlworks Integrator transformations.
The basic setup for real-time CDC-based change replication in the Etlworks Integrator is very easy:
- Setup CDC for the source database.
- Configure Debezium to capture CDC events and publish them to the Kafka topic(s).
- Create a change replication Flow where the source is a Kafka topic and the destination is a target database table.
- Schedule the Flow.
Setup CDC for the source database
Enabling CDC is different for each database. Please use the following tutorials:
Configure Debezium to capture CDC events and publish them to the Kafka topic(s)
Assuming that the Debezium is already installed as a Kafka Connect plugin and up and running, we will be configuring a connector to the source database using Kafka Connect REST API. In this tutorial, we will be using Microsoft SQL Server. Configuring connectors to other databases is equally simple.
Official tutorials
The Debezium connectors are created using Kafka Connect REST API so make sure either curl or Postman is installed in your development box. In this tutorial, we will be using Postman.
Step 1. Verify that Kafka Connect is installed and running.
The default port for Kafka Connect API is 8083
. Assuming that it runs on localhost, the URL for the API endpoint which returns configured connectors is:
http://localhost:8083/connectors
Step 2. Create a new connector for Microsoft SQL Server.
ENDPOINT URL: http://localhost:8083/connectors
METHOD: POST
PAYLOAD (example):
{
"name": "sqlserver-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "1433",
"database.user": "sa",
"database.password": "password",
"database.dbname": "database_name",
"database.server.name": "database_server_name",
"table.whitelist": "comma separated list of fully_qualified_table_names",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.database_server_name.database_name",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.operation.header": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"include.schema.changes": "false"
}
}
CDC Event Flattening
The payload contains two important sections required for integrating Etlworks Kafka connector with Debezium:
- The unwrap transformation, which creates a flat version of the CDC event:
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.operation.header": "true",
2. The settings which remove the schema information from the CDC event:
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
The naming convention for Kafka topics
Debezium stores CDC events in a separate topic for each table. If the connector was configured using the following parameters:
"database.dbname": "database_name",
"database.server.name": "database_server_name",
The CDC events for the table cdc_test
will be stored in a Kafka topic database_server_name.database_name.cdc_test
.
Examples of the generated CDC events
The Etlworks Integrator automatically parses CDC events stored in a Kafka topic so you don't have to deal with this, but it is still a good idea to learn how exactly the generated events look like for the different DML statements.
Assuming that we are using a JSON Format for serialization of the CDC events (default) and assuming that the source table was created using the following SQL:
CREATE TABLE dbo.cdc_test(id INT,
NAME VARCHAR(255),
changed DATETIME,
PRIMARY KEY (id))
The generated CDC events will look like the following:
INSERT INTO dbo.test2 (id,name,changed) values (1,'test1',CURRENT_TIMESTAMP)
Key: {"id":1}
Value: {"id":1,"name":"test1","changed":1552064244733}
Header: __debezium-operation=c
UPDATE dbo.cdc_test SET name = 'updated-test1-1', changed=CURRENT_TIMESTAMP
WHERE id = 1
Key: {"id":1}
Value: {"id":1,"name":"updated-test1-1","changed":1552064295845}
Header: __debezium-operation=u
DELETE FROM WHERE id = 1
Key: {"id":1}
Value: none
Header: __debezium-operation=DELETE
Configure serialization Format for DATE/TIME fields
As you probably noticed, the value of the timestamp field changed
is generated as Unix epoch time in milliseconds. You can convert it to the human (and database) readable Format using TimestampConverter transformation:
"transforms":"unwrap,convert",
"transforms.convert.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert.target.type":"string",
"transforms.convert.field":"changed",
"transforms.convert.format":"yyyy-MM-dd HH:mm:ss"
Create a change replication Flow
Step 1. Create source Kafka Connection. When creating a Connection, select Debezium
as CDC provider. Enter a wildcard topic name in the Format database_server_name.database_name.*
.
Step 2. Create a JSON Format with all default settings.
Step 3. Create a Connection to the destination database. Disable the Auto Commit
.
Step 4. Create a new Flow by selecting queue to database
from the gallery.
Step 5. Add a new source-to-destination transformation where:
- The
FROM
Connection is the Connection to Kafka created in step 1. - The
FROM
Format is the JSON Format created in step 2. - The
FROM
is a topic name for the specific table. - The
TO
is a destination table name. - The
TO
Connection is the destination database Connection created in step 3.
Step 6. Click Mappings
, select the Parameters
tab and configure the following Parameters
:
Stream Data
: enabledUse Bind Variables
: enabledCreate Tables and Modify DML to include only fields which exist in the destination
: enabledAction
:Record
Lookup Fields
: the unique field(s) for the record. In our example the unique field isID
.
Step 7. If the foreign constraints are disabled (or do not exist) in the destination database, you can enable the processing of each transformation in a Parallel
thread.
Step 8. Add the source-to-destination transformations for all tables with enabled CDC. You can duplicate the existing transformation and change the topic name (the Format is server_name.database_name.table_name
), the destination table name, and the Lookup Fields
.
Schedule the Flow
Schedule the Flow to run continuously or in real-time.
Comments
0 comments
Please sign in to leave a comment.