- Starter
- Business
- Enterprise
- On-Premise
- Add-on
Overview
Change Data Capture (CDC) pipeline allows extracting changes in real-time from the databases which support CDC and loading them into the relational database which supports the bulk load.
We use heavily modified embedded Debezium engine for CDC.
Supported source databases
- MySQL
- SQL Server
- PostgreSQL
- Oracle
- DB2
- MongoDB
- AS400 (IBMI platfroms)
When to use this pipeline
Use the pipeline described in this article to extract data from the CDC-enabled database and load it into the Vertica database in real time.
Flows optimized for Vertica
Flow type | When to use | |
|
|
When you need to extract data from any source, transform it and load it into Vertica. |
Bulk load files into Vertica | When you need to bulk-load files that already exist in local or cloud storage without applying any transformations. The flow automatically generates the COPY command and MERGEs data into the destination. | |
Stream CDC events into Vertica | You are here | When you need to stream updates from the database which supports Change Data Capture (CDC) into Vertica in real-time. |
Stream messages from a queue into Vertica | When you need to stream messages from the message queue which supports streaming into Vertica in real time. |
How it works
The end-to-end CDC pipeline extracts data from a CDC-enabled database and loads it into the Vertica.
There are three options when creating a pipeline:
Pipeline | Description | Supported operations |
1. Stream CDC events directly from the source database into Vertica | This is a standard "CDC to any database" flow that supports any relational database as a destination, including Vertica. Unlike some other analytical databases, such as Snowflake and Redshift, etc. Vertica supports fast INSERT without using the bulk-load COPY command. | INSERT |
2. A single flow that streams CDC events directly into the destination database | The flow streams CDC events into the designated stage in the local or cloud storage in real-time and periodically (as often as every second) loads the data into the destination database in parallel with the stream. The advantage of the pipeline with a single flow is simplicity. There is just one flow to configure, schedule, and monitor. The disadvantage is the fact that if the streaming fails, the load also fails, and vice versa. Note that the flow is fault tolerant. After fixing the issue the stream and load resume from the last successful checkpoint. | INSERT and MERGE |
3. A pipeline with independent Extract and Load flows |
The extract and load Flows are running in parallel, which guarantees a very high processing speed and low latency. The actual pipeline can include multiple independent extract and load Flows which allows it to scale horizontally between multiple processing nodes. |
INSERT and MERGE |
Prerequisites
1. CDC is enabled for the source database.
2. Vertica database is up and running and accessible from the Etlworks instance. Read how to work with on-premise data in Etlworks.
A pipeline with a single flow
Create and schedule CDC flow with bulk load
CDC flow streams CDC events into the designated stage in the local or cloud storage in real-time and periodically (as often as every second) loads the data into the Vertica database in parallel with the stream.
There is no need to create a separate Flow for the initial load. The first time it connects to a CDC-enabled source database, it reads a consistent snapshot of all of the included databases and tables. When that snapshot is complete, the Flow continuously reads the changes that were committed to the transaction log and generates the corresponding insert, update, and delete events.
Read more about CDC in Etlworks.
Step 1. Create a CDC Connection for the source.
Read how to create a CDC connection.
Step 2 (optional). Create connection for staging files in a cloud storage
If you are planning to use cloud storage such as AWS S3 or Google Cloud Storage for staging and loading file create one of the following connections:
- Amazon S3 - for staging files in S3.
- Google Cloud Storage - for staging files in GC storage.
This step is not required if the flow loads locally staged files.
Step 3. Create a connection for the Vertica database.
Read about Vertica connector. It is recommended to enable the auto-commit for the destination connection.
Step 4. Create a connection for history and offset files.
Read how to create CDC Offset and History connection.
Etlworks CDC connectors store the history of DDL changes for the monitored database in the history file and the current position in the transaction log in the offset file.
Typical CDC extract flow starts by snapshotting the monitored tables (A) or starts from the oldest known position in the transaction (redo) log (B), then proceeds to stream changes in the source database (C). If the Flow is stopped and restarted, it resumes from the last recorded position in the transaction log. The connection created in this step can be used to reset the CDC pipeline and restart the process from scratch.
The connection, by default, points to the directory {app.data}/debezium_data
.
Step 5. Create CDC flow.
In Flows click Add flow
. Type incdc
in Select Flow type. Select Stream CDC events into Vertica
.
Left to right: select the CDC connection created in step 1, select tables to monitor in FROM, select the database connection created in step 3, and select or enter the destination table name in TO. When streaming data from multiple source tables set the destination table using a wildcard template.
Step 6 (optional). Set connection for staging files in cloud storage.
If you are planning to stage files in AWS S3 bucket or Google Cloud Storage select Connections
tab, select connection created in step 2 and select CSV format.
Step 7. Configure load parameters
Click the MAPPING
button, select the Parameters
tab.
If needed modify the following Load parameters:
Load data into Vertica every (ms)
: by default, the flow loads data into Vertica every 5 minutes (300000 milliseconds). The load runs in parallel with the CDC stream, which never stops. Decrease this parameter to load data into Vertica more often or increase it to reduce the number of consumed database credits.Wait (ms) to let running load finish when CDC stream stops
: By default, the flow loads data into Vertica every 5 minutes. The CDC stream and load are running in parallel, so when streaming stops, the flow executes the load last more time to finish loading the remaining data in the queue. It is possible that the load flow is still running when the stream stops. Use this parameter to configure how long the flow should wait before executing the load last time. Clear this parameter to disable the wait. In this case, if the load task is still running, the flow will finish without executing the load one last time. The flow will load the remaining data in the queue on the next run.Bulk Load SQL
: the Bulk Load SQL is used to load files in the cloud or file storage into the staging tables in Vertica database. This is a required parameter.By default, we assume that the flow bulk-loads from the local (server) storage using
COPY FROM LOCAL
command.COPY {TABLE} FROM LOCAL '{FULL_FILE_NAME}'
PARSER fcsvparser(header='true') ABORT ON ERROR;You can modify the SQL statement that will be used to bulk-load data into the destination table. The following
{tokens}
can be used as a part of the SQL statement:{TABLE}
: the table to load data into.{TEMP_TABLE}
: the staging table name used for MERGE.{FILE_TO_LOAD}
: the file name to load without path and extension.PATH
: the path of the file to load without file name, for example{app.data}/test/
.EXT
: the file extension of the file to load, without.
, for example,csv
.
Action
: the action can beMERGE
(default) orINSERT
. If the action is set toMERGE
the flow will INSERT records that do not exist in the destination table, UPDATE existing records, and DELETE records that were deleted in the source table.Lookup Fields
:MERGE
action requires a list of columns that uniquely identify the record. By default, the flow will attempt to predict the Lookup Fields by checking unique indexes in the source and destination tables, but if there is no unique index in either table it is not guaranteed that the prediction will be 100% accurate. Use this parameter to define the Lookup Fields in the following format:fully.qualified.table1=field1,field2;fully.qualified.table2=field1,field2
.
The other parameters are similar or the same as for the flow type Bulk load files into Vertica.
Step 8. Schedule CDC flow.
We recommend using a continuous run Schedule type. The idea is that the Flow runs until it is stopped manually, there is an error, or (if configured) there are no more new CDC events for an extended period of time. It restarts automatically after a configurable number of seconds.
Monitor running CDC flow
Read how to monitor running CDC flow.
A pipeline with independent extract and load flows
Create and schedule Extract flow
CDC extract flow extracts data from a CDC-enabled database and creates CSV files with CDC events in the configured location. These files are loaded into the Vertica database by Load Flow.
There is no need to create a separate Flow for the initial load. The first time it connects to a CDC-enabled source database, it reads a consistent snapshot of all of the included databases and tables. When that snapshot is complete, the Flow continuously reads the changes that were committed to the transaction log and generates the corresponding insert, update, and delete events.
Read more about CDC in Etlworks.
Step 1. Create a CDC Connection for the source database.
Read how to create a CDC connection.
Step 2. Create file storage connection for CDC events.
Depending on how you prefer to stage files with CDC events for loading data into the destination database, create one of the following connections:
- The CDC events connection for loading data from the local (server) storage. The connection, by default, points to the directory
{app.data}/debezium_data/events
. - S3 connection for loading data from the S3.
- Azure storage connection for loading data from the Azure storage.
- Google Cloud Storage connection for loading data from the Google Cloud storage.
To improve performance when loading data from cloud storage such as S3, Azure Storage, and Google Cloud Storage, it is recommended that you enable GZip
archiving. The bulk load command must support loading from gzipped files.
Step 3. Create a connection for history and offset files.
Read how to create CDC Offset and History connection.
Etlworks CDC connectors store the history of DDL changes for the monitored database in the history file and the current position in the transaction log in the offset file.
Typical CDC extract flow starts by snapshotting the monitored tables (A) or starts from the oldest known position in the transaction (redo) log (B), then proceeds to stream changes in the source database (C). If the Flow is stopped and restarted, it resumes from the last recorded position in the transaction log. The connection created in this step can be used to reset the CDC pipeline and restart the process from scratch.
The connection, by default, points to the directory {app.data}/debezium_data
.
Step 4. Create CSV format.
This format is used to create CSV files with CDC events.
Read how to create CSV format.
Enable the following properties:
Always enclose
Escape double-quotes
Save Metadata
Step 5. Create CDC extract flow.
In Flows click Add flow
. Type in cdc
in Select Flow type. Select Stream CDC events, create files
.
Left to right: select CDC connection created in step 1, select tables to monitor in FROM, select connection created in step 2 to stage files with CDC events, select format created in step 4.
You can now execute flow manually or schedule it to run continuously.
To stop CDC Flow manually, click Stop
/ Cancel
.
Note that as configured, the CDC flow never stops automatically. It is a recommended configuration. You can configure the CDC Connection to stop when there are no more new CDC events for an extended period of time. Read more.
Step 6. Schedule CDC extract flow.
We recommend using a continuous run Schedule type. The idea is that the extract Flow runs until it is stopped manually, there is an error, or (if configured) there are no more new CDC events for an extended period of time. It restarts automatically after a configurable number of seconds.
Monitor running CDC extract flow
Read how to monitor running CDC extract flow.
Create and schedule Load flow
This Flow is used to bulk load files created by the CDC extract flow into the destination database.
Read more about bulk load flow.
Step 1. Create a database connection for Vertica database.
Read more about Vertica connector. It is recommended to enable the auto-commit for the destination connection.
Step 2. Add new bulk load flow.
InFlows
click[+]
, type invertica
, and select the Bulk load CSV files into Vertica.
Step 3. Configure load transformation.
Select or enter the following attributes of the transformation (left to right):
1. Storage Connection for CDC events created in this step.
2. CSV Format created in this step.
3. A wildcard filename that matches the names of the files created by the CDC extract flow: *_cdc_stream_*.csv
for uncompressed files or *_cdc_stream_*.gz
for compressed files.
Note: If the source connection supports default wildcard templates (parameter Contains CDC events
is enabled or the source connection is created using CDC Events connector) the wildcard filename can be selected in FROM.
4. Vertica Connection created in step 1.
5. The wildcard destination table name.
Step 4. Configure how to load data into the destination tables.
1. Click the MAPPING
button and select Parameters
tab.
2. Configure Bulk Load SQL
. The Bulk Load SQL is used to load files in the cloud or file storage into the staging tables in Vertica database. This is a required parameter.
By default, we assume that the flow bulk-loads from the local (server) storage usingCOPY FROM LOCAL
command.
COPY {TABLE} FROM LOCAL '{FULL_FILE_NAME}'
PARSER fcsvparser(header='true') ABORT ON ERROR;
You can modify the SQL statement that will be used to bulk-load data into the destination table. The following {tokens}
can be used as a part of the SQL statement:
{TABLE}
: the table to load data into.{TEMP_TABLE}
: the staging table name used for MERGE.{FILE_TO_LOAD}
: the file name to load without path and extension.PATH
: the path of the file to load without file name, for example{app.data}/test/
.EXT
: the file extension of the file to load, without.
, for example,csv
.
Read about VerticaCOPY
command options.
4. Optionally configure How to MERGE
How to MERGE
: defines how flow merges data in the temp or staging table with the data in the actual table. The default isDELETE/INSERT
: DELETE all records in the actual table that also exist in the temp table, then INSERT all records from the temp table into the actual table. If this parameter is set toMERGE with separate DELETE
the flow will execute native Vertica MERGE SQLfor INSERTs and UPDATEs and separately DELETE for DELETEs.
Step 5. Configure flow to automatically handle source schema changes.
The load flow always creates a table in the destination database if it does not exist.
Enable Alter target table if the source has columns that the target table doesn't have
to automatically add missing columns to the target table.
You can now execute the Load flow manually.
Step 6. Schedule load flow.
Schedule flow to run as often as needed. These are the options:
- Run flow periodically (as often as once a minute).
- Run flow continuously (as often as once a second).
Comments
0 comments
Please sign in to leave a comment.