Overview
Change data capture (CDC) is an approach to data integration that is based on the identification, capture, and delivery of the changes made to the source database and stored in the database redo log (also called transaction log).
The changes are captured without making application-level changes and without having to scan transactional tables. CDC is a perfect solution for non-intrusive, low-impact real-time data integration.
Etlworks supports native log-based change data capture for PostgreSQL, SQL Server, MySQL, Oracle, DB2, and MongoDB.
We are using a heavily modified embedded Debezium engine for CDC.
Read about other change replication techniques available in Etlworks Integrator.
Enable change data capture for the source database
Enabling CDC is different for each database. Please use the following tutorials:
CDC pipeline
The end-to-end CDC pipeline extracts data from a source CDC-enabled database and loads it into any supported destination:
- another relational database,
- cloud data warehouse such as Snowflake, Redshift or Azure Synapse,
- a web service,
- a data lake,
- a file storage system.
In Etlworks the end-to-end pipeline typically includes 2 flows:
- Extract flow - this flow extracts data from a CDC-enabled database
- Load flow - this flow loads data into any supported destination
The extract and load flows are running in parallel which guarantees a very high processing speed and low latency. In fact, the actual pipeline can include multiple independent extract and load flows which allows it to scale horizontally between multiple processing nodes.
The Extract
This flow extracts data from a CDC-enabled database.
There are 2 types of extract CDC flows in Etlworks:
Stream CDC events, create files
In this scenario, the CDC events (INSERTS, UPDATES, and DELETES) are extracted from the transaction log of the source database and stored as CSV files in the local or cloud storage.
This flow type does not require any additional elements of the infrastructure, such as the message queue. It is also the fastest type of CDC supported by Etlworks.
The separate load flow reads the files, then transforms and loads data into the destination. The load flow must delete the processed files.
Creating a flow that streams CDC events and creates files in the local or cloud storage
There is no need to create a separate flow for the initial load. The first time it connects to a CDC-enabled source database, it reads a consistent snapshot of all of the whitelisted databases. When that snapshot is complete, the flow continuously reads the changes that were committed to the transaction log and generates the corresponding insert, update, and delete events.
Step 1. Create a CDC connection for the source database. Configure the following parameters:
- Enable Serialize CDC events as CSV files
- Include Databases(s) - the comma-separated list of databases(s) to poll CDC events from.
- Include Table(s) - the command-separated list of fully qualified table names or regular expressions to match the table names. You can override this in the FROM attribute of the source-to-destination transformation. If you are planning to poll data from tens or hundreds of tables consider using regular expressions or {tokenizing} the Whitelisted Table(s) or FROM.
Step 2. Create new flow using flow type Stream CDC events, create files
Step 3. Add a single source-to-destination transformation and set the following parameters:
- Source connection - CDC connection created in step 1
- FROM - the following options are available:
- * - the system will get the list of the whitelisted tables from the CDC connection, or
- a regular expression to match the table names, or
- a comma-separated list of tables to poll data from, or
- {token}
Example with a comma-separated list of whitelisted tables:
- CDC key -
[db]_[table]_cdc_stream
- Include Table(s)- test.inventory,test.payment,test.customer
- FROM - *
- The following files will be created in the local storage:
- {app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
- {app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
- {app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
Example with a regular expression:
- CDC key -
[db]_[table]_cdc_stream
- Include Table(s)- ^[t][e][s][t][.][aA][a-zA-Z\d_\s]+
- FROM - *
- The following files will be created in the local storage:
- {app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
- {app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
- {app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
The {app.data}/debezium_data is a default root location for the CDC events serialized as CSV files. You can override it by changing the value of the property Location of the CDC events under the CDC connection->Storage.
Step 4. Schedule the extract flow. We recommend using a continuous run schedule type. The idea is that the extract flow runs until there are no CDC events to poll, stops for the configured number of seconds, then starts again.
Stream CDC events to a message queue
In this scenario, the CDC events (INSERTS, UPDATES, and DELETES) are extracted from the transaction log of the source database and sent to the message queue, such as Kafka and Azure Event Hubs.
Message queues such as Kafka and Azure Event Hubs support transactional polling which makes the entire process very reliable and highly scalable. The only downside is a requirement to have a message queue, which is a separate element of the infrastructure that is not managed by Etlworks. Here is a list of fully managed message queue services, compatible with Kafka consumer and producer APIs:
- Azure Event Hubs - read how to create a connection for Azure Event Hub.
- Confluent Platform
- CloudKafka - only service with a free forever tier. The free tier is very limited but can be used for testing.
- Aiven for Apache Kafka
- AWS managed Kafka (MSK) - note that due to the limitation of the MSK, it is only possible to connect to MSK endpoints from the same VPC. Read more here.
The separate load flow polls the events from the topics in the message queue, then transforms and loads data into the destination.
Creating a flow that streams CDC events to a message queue
There is no need to create a separate flow for the initial load. The first time it connects to a CDC-enabled source database, it reads a consistent snapshot of all of the whitelisted databases. When that snapshot is complete, the flow continuously reads the changes that were committed to the transaction log and generates the corresponding insert, update, and delete events.
Step 1. Create a CDC connection for the source database. Configure the following parameters:
- Include Databases(s) - the comma-separated list of databases(s) to poll CDC events from.
- Include Table(s) - the command-separated list of fully qualified table names or regular expressions to match the table names. You can override this in the FROM attribute of the source-to-destination transformation. If you are planning to poll data from tens or hundreds of tables, consider using regular expressions or {tokenizing} the Whitelisted Table(s) or FROM.
Step 2. Create a Kafka or Azure Event Hub connection.
Step 3. Create JSON (recommended) or Avro format.
Step 4. Start creating a flow by selecting the Stream CDC event to message queue from the gallery.
Step 5. Add a single source-to-destination transformation where:
- Source connection - CDC connection created in step 1
- FROM - the following options are available:
- * - the system will get the list of the whitelisted tables from the CDC connection, or
- a regular expression to match the table names, or
- a comma-separated list of tables to poll data from, or
- {token}
- Destination connection - Kafka or Azure Event Hub connection created in step 2
- Format- JSON or Avro format created in step 3
- TO - any name, for example, topic
Example:
- CDC key -
[db]_[table]_cdc_stream
- Include Table(s)- test.inventory,test.payment,test.customer
- FROM - *
The following topics will be created and the flow will send CDC events to these topics:
- test_invemtory_cdc_stream
- test_payment_cdc_stream
- test_customer_cdc_stream
Step 6. Configure mapping and other transformations if needed.
Step 7. Schedule the extract flow. We recommend using a continuous run schedule type. The idea is that the extract flow runs until there are no CDC events to poll, stops for the configured number of seconds, then starts again.
The Load
This flow loads data files created by the extract flow into any supported destination.
Just like with extract flow there are two load flows types:
- The flows that load data from files
- The flows that load data in a message queue
Load data from files
Load data from CSV files when the destination is Snowflake
This flow reads CDC events from the CSV files in local or cloud storage, then transforms and loads data into the Snowflake.
The staged CSV files created by the extract flow are processed from there, then deleted after the processing.
Step 1. Create a Server storage connection for the staged CSV files. The default location is {app.data}/debezium_data/events.
Step 2. Create CSV format with all default settings. Enable Save metadata parameter.
Step 3. Create all required for Snowflake flows connections and formats.
Step 4. Create a new flow using flow type file to snowflake
Follow the same steps as suggested in this article.
Step 5. Add as many source-to-destination transformations as there are distinct filename patterns to poll data from.
For example, if the following files are created by the extract flow:
- {app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
- {app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
- {app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
you will need 3 pairs of source-to-destination transformations, where the sources (FROM) will need to be set to the following wildcard file names:
- test_inventory_cdc_stream_*.csv
- test_payment_cdc_stream_*.csv
- test_customer_cdc_stream_*.csv
Alternatively, if you want to process all files without creating separate pairs of source-to-destination transformations:
- Set the FROM to the wildcard file name that matches all files to load, for example
*_*_cdc_stream_*.csv
- Set the TO to
schema.*
, for examplepublic.*
- Set Calculate Destination Object Name under MAPPING->Parameters to
var start = name.indexOf('-');
var end = name.indexOf('_cdc_stream');
value = name.substring(start + 1, end);
You can also use regular expression _(.*?)_cdc_stream
to match the table names.
Read more about configuring TO when processing files by a wildcard name.
For each transformation set the following parameters:
- Source connection - the connection created in step 1.
- CSV format created in step 2.
- FROM - a wildcard filename as described above.
- Destination connection - a connection for the Snowflake stage (internal or external)
- Destination format - CSV format created in step 2
- TO - a destination table name or a wildcard table name
Step 6. If you are not planning to transform the data before loading into the Snowflake enable Ignore Transformations under Parameters. This will have a positive impact on performance.
Step 7. Optionally configure the number of files to process in one batch. It is useful if you expect that a large (many thousands) number of files could be created by the extract flow in a short period of time and don't want the load flow to be overwhelmed. Processing a smaller number of files (hundreds) in micro-batches will be more efficient.
Step 8. Enable Delete loaded source files under Parameters.
Step 9. All other parameters are the same as in the regular Snowflake flow, except the Action, which can be set to one of the following:
- CDC MERGE - the system will use the event type stored in the record: "c" for INSERT, "u" for UPDATE and "d" for DELETE. This option requires setting the Lookup fields to the comma-separated list of fields that uniquely identify the record in the target database. Optionally, you can enable the Predict Lookup Fields option. If neither works you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
- COPY INTO - in some cases, it makes sense to always insert unmodified CDC events into the staging table. Then load events from the staging table(s) into the final table using SQL. When this option is enabled, the following fields will be automatically added to the staging table:
- debezium_cdc_op - event type, "c" for create, "u" for update and "d" for delete.
- debezium_cdc_timestamp - the event timestamp.
Step 10. Schedule load flow to run in parallel with the extract flow. Just like with extract flow we recommend using a continuous run schedule type.
Load data from CSV files when the destination is Amazon Redshift
This flow reads CDC events from the CSV files in local or cloud storage, then transforms and loads data into the Amazon Redshift.
The staged CSV files created by the extract flow are processed from there, then deleted after the processing.
Step 1. Create a Server storage connection for the staged CSV files. The default location is {app.data}/debezium_data/events.
Step 2. Create a CSV format. Enable Save metadata parameter.
Step 3. Create all required for Redshift flows connections and formats.
Step 4. Create a new flow using flow type file to redshift
Follow the same steps as suggested in this article.
Step 5. Add as many source-to-destination transformations as there are distinct filename patterns to poll data from.
For example, if the following files are created by the extract flow:
- {app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
- {app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
- {app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
you will need 3 pairs of source-to-destination transformations, where the sources (FROM) will need to be set to the following wildcard file names:
- test_inventory_cdc_stream_*.csv
- test_payment_cdc_stream_*.csv
- test_customer_cdc_stream_*.csv
Alternatively, if you want to process all files without creating separate pairs of source-to-destination transformations:
- Set the FROM to the wildcard file name that matches all files to load, for example
*_*_cdc_stream_*.csv
- Set the TO to
schema.*
, for examplepublic.*
- Set Calculate Destination Object Name under MAPPING->Parameters to
var start = name.indexOf('-');
var end = name.indexOf('_cdc_stream');
value = name.substring(start + 1, end);
You can also use regular expression _(.*?)_cdc_stream
to match the table names.
Read more about configuring TO when processing files by a wildcard name.
For each transformation set the following parameters:
- Source connection - connection created in step 1.
- CSV format created in step 2.
- FROM - a wildcard filename as described above.
- Destination connection - a connection for the Redshift stage.
- Destination format - CSV format created in step 2
- TO - a destination table name or a wildcard table name
Step 6. If you are not planning to transform the data before loading into the Redshift enable Ignore Transformations under Parameters. This will have a positive impact on performance.
Step 7. Optionally configure the number of files to process in one batch. It is useful if you expect that a large (many thousands) number of files could be created by the extract flow in a short period of time and don't want the load flow to be overwhelmed. Processing a smaller number of files (hundreds) in micro-batches will be more efficient.
Step 8. Enable Delete loaded source files under Parameters.
Step 9. All other parameters are the same as in the regular Redshift flow, except the Action, which can be set to the one of the following:
- CDC MERGE - the system will use the event type stored in the record: "c" for INSERT, "u" for UPDATE and "d" for DELETE. This option requires setting the Lookup fields to the comma-separated list of fields that uniquely identify the record in the target database. Optionally, you can enable the Predict Lookup Fields option. If neither works you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
- COPY - in some cases, it makes sense to always insert unmodified CDC events into the staging table. Then load events from the staging table(s) into the final table using SQL. When this option is enabled the following fields will be automatically added to the staging table:
- debezium_cdc_op - event type, "c" for create, "u" for update and "d" for delete.
- debezium_cdc_timestamp - the event timestamp.
Step 10. Schedule load flow to run in parallel with the extract flow. Just like with extract flow we recommend using a continuous run schedule type.
Load data from CSV files when the destination is Azure Synapse Analytics
This flow reads CDC events from the CSV files in local or cloud storage, then transforms and loads data into the Azure Synapse Analytics.
The staged CSV files created by the extract flow are processed from there, then deleted after the processing.
Step 1. Create a Server storage connection for the staged CSV files. The default location is {app.data}/debezium_data/events.
Step 2. Create CSV format with all default settings. Enable Save metadata parameter.
Step 3. Create all required for Synapse Analytics flows connections.
Step 4. Create a new flow using flow type file to azure synapse
Follow the same steps as suggested in this article.
Step 5. Add as many source-to-destination transformations as there are distinct filename patterns to poll data from.
For example, if the following files are created by the extract flow:
- {app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
- {app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
- {app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
you will need 3 pairs of source-to-destination transformations, where the sources (FROM) will need to be set to the following wildcard file names:
- test_inventory_cdc_stream_*.csv
- test_payment_cdc_stream_*.csv
- test_customer_cdc_stream_*.csv
Alternatively, if you want to process all files without creating separate pairs of source-to-destination transformations:
- Set the FROM to the wildcard file name that matches all files to load, for example
*_*_cdc_stream_*.csv
- Set the TO to
schema.*
, for examplepublic.*
- Set Calculate Destination Object Name under MAPPING->Parameters to
var start = name.indexOf('-');
var end = name.indexOf('_cdc_stream');
value = name.substring(start + 1, end);
You can also use regular expression _(.*?)_cdc_stream
to match the table names.
Read more about configuring TO when processing files by a wildcard name.
For each transformation set the following parameters:
- Source connection - the connection created in step 1.
- CSV format created in step 2.
- FROM - a wildcard filename as described above.
- Destination connection - a connection for the Snowflake stage (internal or external)
- Destination format - CSV format created in step 2
- TO - a destination table name or a wildcard table name
Step 6. If you are not planning to transform the data before loading into the Synapse Analytics enable Ignore Transformations under Parameters. This will have a positive impact on performance.
Step 7. Optionally configure the number of files to process in one batch. It is useful if you expect that a large (many thousands) number of files could be created by the extract flow in a short period of time and don't want the load flow to be overwhelmed. Processing a smaller number of files (hundreds) in micro-batches will be more efficient.
Step 8. Enable Delete loaded source files under Parameters.
Step 9. All other parameters are the same as in the regular Synapse Analytics flow, except the Action, which can be set to one of the following:
- CDC MERGE - the system will use the event type stored in the record: "c" for INSERT, "u" for UPDATE and "d" for DELETE. This option requires setting the Lookup fields to the comma-separated list of fields that uniquely identify the record in the target database. Optionally, you can enable the Predict Lookup Fields option. If neither works you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
- COPY INTO - in some cases, it makes sense to always insert unmodified CDC events into the staging table. Then load events from the staging table(s) into the final table using SQL. When this option is enabled, the following fields will be automatically added to the staging table:
- debezium_cdc_op - event type, "c" for create, "u" for update and "d" for delete.
- debezium_cdc_timestamp - the event timestamp.
Step 10. Schedule load flow to run in parallel with the extract flow. Just like with extract flow we recommend using a continuous run schedule type.
Load data from CSV files when the destination is a relational database
This flow reads CDC events from the CSV files in local or cloud storage then transforms and loads data into the destination database.
The staged CSV files created by the extract flow are processed then deleted after the processing.
Step 1. Create a Server storage connection for the staged CSV files. The default location is {app.data}/debezium_data/events.
Step 2. Create a CSV format. Enable Save metadata parameter.
Step 3. Create a connection to the destination database.
Step 4. Create a new flow using a flow type file to database (assuming that the destination is a database).
Step 5. Add as many source-to-destination transformations as there are distinct filename patterns to poll data from.
For example, if the following files are created by the extract flow:
- {app.data}/debezium_data/events/test_inventory_cdc_stream_uuid.csv
- {app.data}/debezium_data/events/test_payment_cdc_stream_uuid.csv
- {app.data}/debezium_data/events/test_customer_cdc_stream_uuid.csv
you will need 3 pairs of source-to-destination transformations, where the sources (FROM) will need to be set to the following wildcard file names:
- test_inventory_cdc_stream_*.csv
- test_payment_cdc_stream_*.csv
- test_customer_cdc_stream_*.csv
Alternatively, if you want to process all files without creating separate pairs of source-to-destination transformations:
- Set the FROM to the wildcard file name that matches all files to load, for example
*_*_cdc_stream_*.csv
- Set the TO to
schema.*
, for examplepublic.*
- Set Calculate Destination Object Name under MAPPING->Parameters to
var start = name.indexOf('-');
var end = name.indexOf('_cdc_stream');
value = name.substring(start + 1, end);
You can also use regular expression _(.*?)_cdc_stream
to match the table names.
Read more about configuring TO when processing files by a wildcard name.
Step 6. For each transformation set the following parameters:
- Source connection - the connection created in step 1.
- Source format - the CSV format created in step 2.
- FROM- a wildcard filename as described above.
- Destination connection - a destination database connection created in step 3.
- TO - a destination table name or a wildcard table name as described above
- Enable Delete loaded sources files
- Set the Action under MAPPING/Parameters- action defines a SQL command to generate. Set Action to one of the following:
- Record - the system will automatically generate INSERT, UPDATE or DELETE SQL statement based on the value of the debezium_cdc_op field (c, u, d) populate during extract from the CDC-enabled database
- Record with MERGE - same as Record, except the system, will generate MERGE SQL statement for inserts and updates. Note that not all databases support native MERGE, also some databases, for example, MySQL and PostgreSQL, require a unique index for the fields that are used to uniquely identify the record.
- Record with IfExist - same as Record with MERGE, except it uses a SELECT statement to check if record exists and then conditionally generates either INSERT or UPDATE.
- Set the Lookup fields under MAPPING/Parameters - a comma-separated list of fields that uniquely identify the record. Alternatively, you can enable the Predict Lookup Fields which if enabled will force the flow to use various algorithms to automatically predict the fields that uniquely identify the record. Note that it is not always possible to correctly detect the unique fields. If neither works you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
Step 7. Optionally configure the number of files to process in one batch. It is useful if you expect that a large (many thousands) number of files could be created by the extract flow in a short period of time and don't want the load flow to be overwhelmed. Processing a smaller number of files (hundreds) in micro-batches will be more efficient.
Step 8. Schedule load flow to run in parallel with the extract flow. Just like with extract flow we recommend using a continuous run schedule type.
Load data from a message queue
Load data from a message queue when the destination is Snowflake
This flow reads CDC events from the message queue's topics, then transforms and loads data into the Snowflake.
Step 1. Create a Kafka or Azure Event Hub connection. You can reuse the same connection that you created for the extract flow.
You must enter a unique name in the Group ID field.
Also, consider changing the default values for the following parameters:
- Number of retries before stop polling - The number of retries before stop polling if poll returns no records. By increasing this number, you can give the flow more time to read the events from the queue.
- Max number of records to poll - The maximum number of records to poll in one call. By changing this number, you can tune the flow for maximum performance or minimum RAM consumption.
- Max number of records to read - The total maximum number of records to read from the queue. Set it to a reasonable number to allow the system to process records in micro-batches. If nothing is entered, the system will read records from the queue until there are no more records. By changing this number, you are either increasing or decreasing the size of the micro-batches.
Step 2. Create all required connections and formats.
Step 3. Start creating a flow by typing in queue to snowflake.
Step 4. Select Queue to the Snowflake.
Follow the same steps as suggested in this article.
Step 5. Add as many source-to-destination transformations as there are queue's topics to poll data from. Most likely you are going to have one topic per source table.
For each transformation set the following parameters:
- Source connection - queue connection created in step 1.
- Source format - a format used in the extract flow, most likely JSON.
- FROM- a topic name to poll data from.
- Destination connection - a connection for the Snowflake stage.
- Destination format - CSV format
- TO - a destination table name
Step 6. All parameters are the same as in the regular Snowflake flow, except the Action, which can be set to one of the following:
- CDC MERGE - the system will use the event type stored in the record: "c" for INSERT, "u" for UPDATE and "d" for DELETE. This option requires setting the Lookup fields to the comma-separated list of fields that uniquely identify the record in the target database. Optionally you can enable the Predict Lookup Fields option. If neither works you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
- COPY INTO - in some cases, it makes sense to always insert unmodified CDC events into the staging table. Then load events from the staging table(s) into the final table using SQL. When this option is enabled, the following fields will be automatically added to the staging table:
- debezium_cdc_op - event type, "c" for create, "u" for update and "d" for delete.
- debezium_cdc_timestamp - the event timestamp.
Step 7. Schedule load flow to run in parallel with the extract flow. Just like with extract flow we recommend using a continuous run schedule type.
Load data from a message queue when the destination is Amazon Redshift
This flow reads CDC events from the message queue's topics, then transforms and loads data into the Amazon Redshift.
Step 1. Create a Kafka or Azure Event Hub connection. You can reuse the same connection that you created for the extract flow.
You must enter a unique name in the Group ID field.
Also, consider changing the default values for the following parameters:
- Number of retries before stop polling - The number of retries before stop polling if poll returns no records. By increasing this number, you can give flow more time to read the events from the queue.
- Max number of records to poll - The maximum number of records to poll in one call. By changing this number, you can tune the flow for maximum performance or minimum RAM consumption.
- Max number of records to read - The total maximum number of records to read from the queue. Set it to a reasonable number to allow the system to process records in micro-batches. If nothing is entered, the system will read records from the queue until there are no more records. By changing this number you are either increasing or decreasing the size of the micro-batches.
Step 2. Create all required connections and formats.
Step 3. Start creating a flow by typing in queue to redshift.
Step 4. Select Queue to the Redshift.
Follow the same steps as suggested in this article.
Step 5. Add as many source-to-destination transformations as there are queue's topics to poll data from. Most likely you are going to have one topic per source table.
For each transformation set the following parameters:
- Source connection - queue connection created in step 1.
- Source format - a format used in the extract flow, most likely JSON.
- FROM- a topic name to poll data from.
- Destination connection - a connection for the Redshift stage.
- Destination format - CSV format
- TO - a destination table name
Step 6. All parameters are the same as in the regular Redshift flow, except the Action, which can be set to the one of the following:
- CDC MERGE - the system will use the event type stored in the record: "c" for INSERT, "u" for UPDATE and "d" for DELETE. This option requires setting the Lookup fields to the comma-separated list of fields that uniquely identify the record in the target database. Optionally, you can enable the Predict Lookup Fields option. If neither works you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
- COPY INTO - in some cases, it makes sense to always insert unmodified CDC events into the staging table. Then load events from the staging table(s) into the final table using SQL. When this option is enabled, the following fields will be automatically added to the staging table:
- debezium_cdc_op - event type, "c" for create, "u" for update and "d" for delete.
- debezium_cdc_timestamp - the event timestamp.
Step 7. Schedule load flow to run in parallel with the extract flow. Just like with extract flow, we recommend using a continuous run schedule type.
Load data from a message queue when the destination is Synapse Analytics
This flow reads CDC events from the message queue's topics, then transforms and loads data into the Synapse Analytics.
Step 1. Create a Kafka or Azure Event Hub connection. You can reuse the same connection that you created for the extract flow.
You must enter a unique name in the Group ID field.
Also, consider changing the default values for the following parameters:
- Number of retries before stop polling - The number of retries before stop polling if poll returns no records. By increasing this number, you can give flow more time to read the events from the queue.
- Max number of records to poll - The maximum number of records to poll in one call. By changing this number, you can tune the flow for maximum performance or minimum RAM consumption.
- Max number of records to read - The total maximum number of records to read from the queue. Set it to a reasonable number to allow the system to process records in micro-batches. If nothing is entered, the system will read records from the queue until there are no more records. By changing this number you are either increasing or decreasing the size of the micro-batches.
Step 2. Create all required connections and formats.
Step 3. Start creating a flow by typing in queue to azure synapse.
Step 4. Select Queue to Azure Synapse Analytics.
Follow the same steps as suggested in this article.
Step 5. Add as many source-to-destination transformations as there are queue's topics to poll data from. Most likely you are going to have one topic per source table.
For each transformation set the following parameters:
- Source connection - queue connection created in step 1.
- Source format - a format used in the extract flow, most likely JSON.
- FROM- a topic name to poll data from.
- Destination connection - a connection for the Synapse Analytics stage.
- Destination format - CSV format
- TO - a destination table name
Step 6. All parameters are the same as in the regular Synapse Analytics flow, except the Action, which can be set to the one of the following:
- CDC MERGE - the system will use the event type stored in the record: "c" for INSERT, "u" for UPDATE and "d" for DELETE. This option requires setting the Lookup fields to the comma-separated list of fields that uniquely identify the record in the target database. Optionally, you can enable the Predict Lookup Fields option. If neither works you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
- COPY INTO - in some cases, it makes sense to always insert unmodified CDC events into the staging table. Then load events from the staging table(s) into the final table using SQL. When this option is enabled, the following fields will be automatically added to the staging table:
- debezium_cdc_op - event type, "c" for create, "u" for update and "d" for delete.
- debezium_cdc_timestamp - the event timestamp.
Step 7. Schedule load flow to run in parallel with the extract flow. Just like with extract flow, we recommend using a continuous run schedule type.
Load data from a message queue when the destination is a relational database
This flow reads CDC events from the message queue's topics, then transforms and loads data into the destination database.
Step 1. Create a Kafka or Azure Event Hub connection. You can reuse the same connection that you created for the extract flow.
You must enter a unique name in the Group ID field.
Also, consider changing the default values for the following parameters:
- Number of retries before stop polling - The number of retries before stop polling if poll returns no records. By increasing this number, you can give the flow more time to read the events from the queue.
- Max number of records to poll - The maximum number of records to poll in one call. By changing this number, you can tune the flow for maximum performance or minimum RAM consumption.
- Max number of records to read - The total maximum number of records to read from the queue. Set it to a reasonable number to allow the system to process records in micro-batches. If nothing is entered, the system will read records from the queue until there are no more records. By changing this number you are either increasing or decreasing the size of the micro-batches.
Step 2. Create a connection to the destination database.
Step 3. Start creating a flow by typing in queue to.
Step 4. Select Queue to the database (assuming that your destination is a relational database).
Step 5. Add as many source-to-destination transformations as there are queue's topics to poll data from. Most likely you are going to have one topic per source table.
For each transformation set the following parameters:
- Source connection - queue connection created in step 1.
- Source format - a format used in the extract flow, most likely JSON.
- FROM- a topic name to poll data from.
- Destination connection - a destination database connection created in step 2.
- TO - a destination table name
- Set the Action under MAPPING/Parameters- action defines a SQL command to execute. Set Action to one of the following:
- Record - the system will automatically generate INSERT, UPDATE or DELETE SQL statement based on the value of the debezium_cdc_op field (c, u, d) populate during extract from the CDC-enabled database
- Record with MERGE - same as Record, except the system, will generate MERGE SQL statement for inserts and updates. Note that not all databases support native MERGE, also some databases, for example, MySQL and PostgreSQL, require a unique index for the fields that are used to uniquely identify the record.
- Record with IfExist - same as Record with MERGE, except it uses a SELECT statement to check if record exists and then conditionally generates either INSERT or UPDATE.
- Set the Lookup fields under MAPPING/Parameters - a comma-separated list of fields that uniquely identify the record.
- Alternatively, you can enable the Predict Lookup Fields which, if enabled, will force the flow to use various algorithms to automatically predict the fields that uniquely identify the record. Note that it is not always possible to correctly detect the unique fields.
- When enabling the Predict Lookup Fields (which is not always accurate) is not an option you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
Step 6. Schedule load flow to run in parallel with the extract flow. Just like with extract flow we recommend using a continuous run schedule type.
Handling SQL NULL and empty ('') values when loading files into databases
If SQL NULL is not the same as empty ('')
Step 1. When creating a CDC connection keep Convert null to empty disabled. Note that it is disabled by default.
Step 2. When creating a CVS format for FROM and TO:
- Enable Convert 'null' to null
- Disable Convert empty string to null
- Set Value for null to
'null'
Step 3. When creating a flow to load files into Snowflake set String used to convert to and from SQL NULL to ('NULL','null')
.
If SQL NULL is the same as empty ('')
Step 1. When creating a CDC connection enable Convert null to empty. Note that it is disabled by default.
Step 2. When creating a CVS format for FROM and TO:
- Disable Convert 'null' to null
- Enable Convert empty string to null
- Set Value for null empty
Step 3. When creating a flow to load files into Snowflake set String used to convert to and from SQL NULL to ('NULL','null','')
.
Handling source table DDL changes
If you are using a flow that stores CDC events as CSV files and are expecting that the structure of the source table can be modified when the flow is still streaming data from this table - when creating the CDC connection enable Close CSV file if header has changed.
Preserving the exact data types of the columns in the source table when creating a destination table
Etlworks can automatically create a destination table when loading CSV files. By default, the flow will sample the file and set the data types of the columns in the destination table. This mechanism is not 100% accurate so the destination table might end up with columns that have different data types compare to the source table.
To prevent this from happening:
Step 1. When creating the CDC connection enable Save Metadata.
Step 2. When creating a CVS format for FROM and TO
- Enable Save Metadata
- Keep All fields are strings disabled
In addition, when enabled, this feature allows the CDC load flow to automatically set the Lookup Fields required for the CDC MERGE by extracting the information about the primary keys from the source database. It significantly enhances the accuracy of the Predict Lookup fields feature. Basically makes it bulletproof and very low cost.
Create all columns as strings (VARCHAR)
Etlworks can automatically create a destination table when loading CSV files. If you wish to create all columns as strings (VARCHAR) then:
Step 1. When creating the CDC connection disable Save Metadata.
Step 2. When creating a CVS format for FROM and TO
- Disable Save Metadata
- Enable All fields are strings
Implementing soft deletes
By default, CDC flow creates, updates, and deletes records in the destination table based on the type of the CDC event - 'c' for create, 'u' for update, and 'd' for delete.
If you need to implement the soft delete - add the Extra column with the specific function to the CDC connection. The following functions are available:
- cdc_boolean_soft_delete - the boolean column with a user-defined name will be added to the CDC stream. Values returned by the function: true for delete, false for all other events
- cdc_timestamp_soft_delete(yyyy-MM-dd HH:mm:ss.SSS) - the timestamp column with a user-defined name will be added to the CDC stream. Values returned by the function: timestamp for delete, null for all other events
In this example, the column deleted_at will be added to the CDC stream and its value will be set to the current timestamp with a milliseconds precision.
If any of the functions above are used the CDC event type is automatically switched to 'u' (update).
Adding columns to the CDC stream
By default, the CDC stream contains all columns in the source table. You can add columns with user-defined names to the stream using the Extra columns property of the CDC connection.
Format of the property: column_name[=function[(parameter)],column_name[=function[(parameter)]
Available functions:
- cdc_op - the CDC event type: 'c' for create, 'u' for update, and 'd' for delete
- cdc_key - the CDC key in the user-define format
- cdc_timestamp(yyyy-MM-dd HH:mm:ss.SSS) - the timestamp of the CDC event. It is not guaranteed that this value will be unique for each event in the sequence of events.
- cdc_event_seq - the unique 'number' of the event. This number grows sequentially.
- cdc_boolean_soft_delete - the boolean column: true for delete, false for all other events
- cdc_timestamp_soft_delete(yyyy-MM-dd HH:mm:ss.SSS) - the timestamp column: timestamp for delete, null for all other events
Creating Files in a cloud storage
By default, the extract flow creates files with CDC events in the local (server) storage. It is possible to configure the CDC connection to creates files directly in any of the following cloud storage services:
- Amazon S3
- Azure Storage
- Google Cloud Storage
When files are created in a cloud it opens the possibility of using serverless data pipelines for loading data into the databases and data warehouses. For example, read about loading data continuously using Snowflake Snopilies.
The other advantage of storing files directly in a cloud is eliminating the step that requires copying files to the cloud in the first place. For example, Amazon Redshift (or any RDS database) can bulk-load files from the S3. When files are already in S3 there is no need to create an additional flow to move files from the local storage to S3. The same is true for Azure databases and data warehouses, for example, Azure Synapse Analytics can bulk load files from the Azure storage so creating the files in the Azure storage makes the pipeline much faster by eliminating a step that moves the files from the local storage to the Azure Blob.
Read how to configure the storage, other than local (server), when creating a CDC connection.
Creating CSV and JSON files using the specified character encoding
If the source database has tables with data encoded using any character set other than ASCII (for example UTF-8) you can configure the connector to create files using that specific encoding.
To preserve the encoding when loading files into the databases you will need to set the same character encoding for the CSV format.
If the encoding is not specified or set to "no encoding" the JSON files are created with UTF-8 encoding.
Comments
0 comments
Please sign in to leave a comment.