Overview
In this scenario, the messages are streamed from the queue and loaded into the destination in real time. This is different from extracting data from a message queue and loading data in micro-batches.
Advantages of the streaming vs. traditional ETL
- Streaming is faster.
- Streaming consumes fewer system resources.
- Streaming is the only way to implement a real-time pipeline.
Here is a complete list of destinations that support streaming from a message queue:
- Server storage, Amazon S3, Azure Blob, Google Cloud Storage, FTP, FTPS, SFTP, Box, Dropbox, Google Drive, OneDrive for Business, SharePoint, WebDAV, SMB Share.
- Relational databases.
- Snowflake, Redshift, Synapse Analytics, Google BigQuery, Vertica, Greenplum, Databricks, Clickhouse.
Advantages of traditional ETL vs. streaming
- Ability to add new columns in mapping.
- Some destinations, for example, HTTP endpoints, well-known APIs such as Google Sheets, and others do not support streaming and require the ETL approach to load data from a message queue.
Here is a complete list of destinations that do not support streaming from a message queue:
-
- HTTP endpoints.
- Google Sheets.
- Outbound email.
- Redis.
- NoSQL databases.
- Other messages queues.
Prerequisites
- The destination must support steaming. Read more.
- The messages in the queue must be stored in JSON format.
Configure connection for streaming
Message queue connection must be configured before it can be used in streaming flows.
Step 1. Create message queue connection.
Step 2. Clear fields Always stop after N minutes
and Max number of records to read
so the streaming flow won't stop unless stopped manually or there is an error.
Use Cases
Etlworks includes several Flow types optimized for streaming data from message queues to various destinations. To create a new streaming flow, go to Flows, click Add flow
, type in stream messages from queue
, and select the Flow type for your destination.
Stream messages and create files in local, server, or cloud storage
This flows streams data from one or multiples message queue topics and creates CSV or JSON files in real time in one of the following destinations:
- Amazon S3
- Google Cloud Storage
- Microsoft Azure Storage
- Server Storage
- FTP
- FTPS
- SFTP
- Box
- Dropbox
- Google Drive
- OneDrive for Business
- SharePoint
- WebDAV
- SMB Share
Create a flow that streams messages to file storage
Step 1. Create and configure source connection for message queue as explained here.
Step 2. Create source JSON format.
We assume that the messages in the queue are stored in JSON format.
Step 3. Create a destination connection:
- Server storage - for creating files in the local or mapped network storage.
- Amazon S3 - for creating files in S3.
- Azure storage - for creating files in Azure blob storage.
- Google cloud storage - for creating files in Google cloud storage.
- FTP - for creating files in the FTP server.
- FTPS - for creating files in an FTP server with encryption.
- SFTP - for creating files in the SFTP server.
- Box - for creating files in Box cloud storage.
- Dropbox - for creating files in Dropbox cloud storage
- Google Drive - for creating files in Google Drive
- OneDrive for Business - for creating files in OneDrive for Business cloud storage.
- SharePoint - for creating files in SharePoint.
- WebDAV - for creating files in the WebDAV server.
- SMB Share - for creating files in mapped SMB share.
If needed, set Archive file before copying to S3
to either GZip
or Zip
. It will enable compression for created files. Compression is supported only for cloud destinations (S3, Azure, Google).
Step 4. Create CSV or JSON format depending on which format you want the flow to create files in. You can keep all the default settings.
Step 5. In Flows, select flow type Stream messages from queue, create files
.
Step 6. Configure source-to-destination transformation (left to right):
- From Connection: source connection created in step 1.
- From Format: source JSON format created in step 2.
- From: select one or multiple topics to stream data from.
- To Connection: destination file storage connection created in step 3.
- To Format: destination format created in step 4.
- To: the file name.
Setting TO when streaming regular messages stored in the queue
The TO in the streaming transformation can include any of the tokens below:
- * - the source topic name.
Setting TO when streaming CDC events stored in the queue
The TO in the streaming transformation can include any of the [tokens] below:
[table]
- the source table name.[db]
- the source database name.[schema]
- the source schema.*
- the source topic name.
The flow substitutes [tokens] on the values of the [tokens].
This example demonstrates how to configure a transformation to create a file that includes the source database and table name: [db]_[table].csv
.
Read how to set the destination name using JavaScript.
Step 7. Configure the size of the created files.
Streaming flow creates files of a fixed size. It adds UUID to the name of each created file.
Click MAPPING
and select Parameters
tab.
Set the file size and how the updated file should remain open.
Stream messages to relational database
This flows streams data from one or multiple message queue topics and loads it into the relational database in real-time.
Create a flow that streams messages into a relational database
Step 1. Create and configure source connection for message queue as explained here.
Step 2. Create source JSON format.
We assume that the messages in the queue are stored in JSON format.
Step 3. Create a destination database connection.
Step 4. In Flows, select flow type Stream messages from queue into database
.
Step 5. Configure source-to-destination transformation (left to right):
- From Connection: source connection created in step 1.
- From Format: source JSON format created in step 2.
- From: select one or multiple topics to stream data from.
- To Connection: destination database connection created in step 3.
- To: the destination table name.
Setting TO when streaming regular messages stored in the queue
The TO in the streaming transformation can include any of the tokens below:
- * - the source topic name.
Setting TO when streaming CDC events stored in the queue
The TO in the streaming transformation can include any of the [tokens] below:
[table]
- the source table name.[db]
- the source database name.[schema]
- the source schema.*
- the source topic name.
The flow substitutes [tokens] on the values of the [tokens].
This example demonstrates how to configure a transformation load data into the table with the same name as the source table: publlic.[table]
.
Read how to set the destination name using JavaScript.
Step 6. Optionally configure mapping
Click theMAPPING
button.
Use the mapping editor to configure the per-field mapping.
Here is what you can do in mapping:
- Disable the column.
- Change the column name.
- Change the column data type when creating a new table.
- Add a function in JavaScript or Python to calculate the column's value.
Here is what you cannot do in mapping:
- Add a new column.
- Use SQL to calculate the column's value.
Read more about mapping.
Step 7. Configure load parameters
Select theParameters
tab.
If needed, modify the following Load parameters:
-
Action
: The type of SQL which will be executed for each message. The action can be one of the following:INSERT
: If the action is set to INSERT, the flow will always insert records into the destination table.MERGE
If the action is set to MERGE, the flow will INSERT records that do not exist in the destination table and UPDATE existing records.CDC MERGE
If the action is set to CDC MERGE the flow will INSERT records that do not exist in the destination table, UPDATE existing records, and DELETE records that were deleted in the source table. CDC MERGE only works when the flow is configured to stream CDC events previously ingested into the queue either by Etlworks or Debezium.
-
Lookup Fields
: a comma-separated list of columns that uniquely identify the record in the destination table.MERGE
action requires a list of columns that uniquely identify the record. By default, the flow will attempt to predict the Lookup Fields by checking unique indexes in the source and destination tables, but if there is no unique index in either table it is not guaranteed that the prediction will be 100% accurate. Use this parameter to define the Lookup Fields in the following format:fully.qualified.table1=field1,field2;fully.qualified.table2=field1,field2
. Custom SQL
: the value of this parameter is used to generate the custom SQL statement for updating data in the destination table. Some databases support the UPSERT SQL clause, which looks like INSERT: UPSERT INTO table (columns) VALUES (values). However, the actual syntax of the UPSERT very much differs depending on the driver. The developer can provide a template for UPSERT, which will be used at runtime to generate the actual SQL statement. If the value of this parameter is empty, the flow generates the default INSERT, MERGE and DELETE SQL statements using the dialect of SQL for the target database.Batch Window
: this parameter is used to group SQL statements in batches, then execute each batch in one database call, which improves the performance of the load. The total number of statements in a batch cannot exceed the Batch Window. Setting this parameter to 1 disables batch processing.Force batch execution every (ms)
: if batch processing is enabled (Batch Window > 1), the flow will force the batch execution after the configured number of milliseconds, even if the current number of statements in the batch is less than the batch window. This parameter is used to prevent the situation when the number of updates for the specific table is so low that it constantly stays below the batch window, preventing pushing the updates to the destination database.Force auto-commit
: when this option is enabled, the transaction is committed for each executed batch. This is a recommended setting. Note that if this option is enabled, the flow forces the auto-commit even if it is disabled for the destination connection.Commit transaction when the transaction is committed in the source
: if this option is enabled, the flow automatically commits a transaction in the destination database when the transaction is committed in the source database. This option is ignored if the flow or the destination connection is configured with auto-commit.Commit interval during snapshot (ms)
: the commit interval during the snapshot. Setting the commit interval prevents the situation when the flow performs the snapshot of the large table, and the data is not committed to the destination database for an extended period of time. This option is ignored if the flow or the destination connection is configured with auto-commit.Alter target table if the source has columns that the target table doesn't have
: if this option is enabled, the flow will automatically create and executeALTER TABLE ADD COLUMN
SQL statement for each column that exists in the source and does not exist in the destination.Add Primary Key
: If this option is enabled, the flow will add a Primary Key for the destination table using column(s) that uniquely identify the record (Lookup Fields). This option is ignored if the Primary Key already exists in the destination table. We recommend enabling this option when the destination database is PostgreSQL or MySQL, and the Action is MERGE (UPSERT). The Primary Key or the unique index is required when executing a MERGE in MySQL or PostgreSQL.
Step 8. Optionally add script which will be executed after processing each row.
It is possible to add a JavaScript code which will be executed after processing each row.
The row considered to be processed when either SQL statement (INSERT, UPDATE, MERGE or DELETE) is executed on a destination table or (if Batch Window
> 1) when the SQL statement is added to the batch.
Typical use cases
- Trigger: execute SQL statement on other table or tables when row is added/updated/deleted in a destination table. Read how to execute any SQL from JavaScript.
- Per-row logging. Read about logging.
Add script
To add a script go to transformation->Configure->Additional transformations
. Add script in For Each Row
field.
Objects available in the script
The following objects can be referenced by name from JavaScript code:
Object name | Class name / JavaDoc | Package |
---|---|---|
dataSet | com.toolsverse.etl.common.DataSet | com.toolsverse.etl.common |
currentRow | com.toolsverse.etl.common.DataSetRecord | com.toolsverse.etl.common |
etlConfig | com.toolsverse.etl.core.config.EtlConfig | com.toolsverse.etl.core.config |
scenario | com.toolsverse.etl.core.engine.Scenario | com.toolsverse.etl.core.engine |
tableName | The name of the destination table | |
tableKey | The primary key columns in the destination table | |
connection | The destination JDBC connection | java.sql |
batchSize |
The size of the Batch Window set for the transformation. There is no batching if batchSize <= 1 | |
autoCommit | True if the transformation is configured to auto commit. The default value is true | |
op | The type of the action for that specific row: 'c' for INSERT, 'u' for UPDATE and 'd' for DELETE | |
destAlias | The destination com.toolsverse.etl.common.Alias | com.toolsverse.etl.common |
driver | The destination com.toolsverse.etl.driver.Driver | com.toolsverse.etl.driver |
Stream messages to the database using bulk load
This Flow streams messages from one or multiple topics into the designated stage in the local or cloud storage in real-time and periodically (as often as every second) loads the data into the destination database in parallel with the stream. It uses a bulk load command to load the data.
Prerequisites for bulk load flow
1. The destination database must support bulk load operations.
Below are some of the examples for the most commonly used databases:
- SQL Server BULK INSERT statement
- RDS SQL Server BULK INSERT statement using Amazon S3 integration
- Postgres COPY command
- RDS Postgres COPY command using Amazon S3 integration
- MySQL LOAD DATA INFILE statement
- Amazon Aurora MySQL LOAD DATA INFILE statement using Amazon S3 integration
- Oracle Inline External Tables
2. For loading data from the external stage in AWS S3, Azure Blob, or Google Cloud Storage, the Amazon S3 bucket, Google Storage bucket, or Azure blob needs to be created. Note that Etlworks flow does not create the bucket or blob.
Create a flow that streams messages and loads them into the database using bulk load
Step 1. Create and configure source connection for message queue as explained here.
Step 2. Create source JSON format.
We assume that the messages in the queue are stored in JSON format.
Step 3. Create a database connection that will be used as a final destination.
Step 4. Create a connection for staging data:
- Server storage - for staging files in the local or mapped network storage.
- Amazon S3 - for creating files in S3.
- Azure storage - for creating files in Azure blob storage.
- Google cloud storage - for creating files in Google cloud storage.
If needed, set Archive file before copying to S3
to either GZip
or Zip
. It will enable compression for created files. Compression is supported only for cloud destinations (S3, Azure, Google).
Step 5. Create CSV or JSON format depending on which format you want the flow to create files in.
For CSV format enable the following properties:
Always enclose
Escape double-quotes
Save Metadata
Step 6. In Flows, select flow type Stream messages from queue and bulk load them into database
.
Step 7. Configure source-to-destination transformation (left to right):
- From Connection: source connection created in step 1.
- From Format: source JSON format created in step 2.
- From: select one or multiple topics to stream data from.
- To Connection: file storage connection created in step 4.
- To Format: destination format created in step 5.
- To: the destination table name. It is recommended to set TO as
db.schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Step 8. Select Connections
tab and select the final destination connection created in step 3.
Step 9. Optionally configure mapping
Click theMAPPING
button.
Use the mapping editor to configure the per-field mapping.
Here is what you can do in mapping:
- Disable the column.
- Change the column name.
- Change the column data type when creating a new table.
Here is what you cannot do in mapping:
- Add a new column.
- Modify the value of the existing column.
Step 10. Configure the size of the created files.
Streaming flow creates files of a fixed size. It adds UUID to the name of each created file.
Click MAPPING
and select Parameters
tab.
Set the file size and how the updated file should remain open.
Step 11. Configure load parameters
If needed, modify the following Load parameters:
Load data into database every (ms)
: by default, the flow loads data into database every 5 minutes (300000 milliseconds). The load runs in parallel with the stream, which never stops. Decrease this parameter to load data into database more often or increase it to reduce the number of consumed database credits.Wait (ms) to let running load finish when stream stops
: By default, the flow loads data into database every 5 minutes. The stream and load are running in parallel, so when streaming stops, the flow executes the load last more time to finish loading the remaining data in the queue. It is possible that the load flow is still running when the stream stops. Use this parameter to configure how long the flow should wait before executing the load last time. Clear this parameter to disable the wait. In this case, if the load task is still running, the flow will finish without executing the load one last time. The flow will load the remaining data in the queue on the next run.-
Load into staging table
: by default, the Flow will attempt to create and load data into the temporary table. Not all databases support the bulk load into the temp table. When this parameter is enabled (it is disabled by default) the flow will create the staging table instead of temporary. It will automatically drop the staging table on the exit. -
Bulk Load SQL
: the Bulk Load SQL is used to load files in the cloud or file storage into the staging or temporary tables in the destination database. This is a required parameter. The following {tokens} can be used in the Bulk Load SQL:- {TABLE} - the table to load data into,
- {FILE_TO_LOAD} - the name of the file to load with path and extension,
- {FILE} - the name of the file to load without path ,
- {FULL_FILE_NAME} - same as {FILE_TO_LOAD},
- {FILE_NO_EXT} - the name of the file to load without path and extension, {EXT} - the extension of the file to load without '.'
- Example for Azure SQL Server:
-
BULK INSERT {TABLE}
FROM '{FILE_TO_LOAD}'
WITH (
DATA_SOURCE = 'BulkLoadDataSource',
FIELDTERMINATOR = ',',
FORMAT='CSV',
FIRSTROW = 2,
MAXERRORS = 10
) -
Action
: The type of SQL which will be executed for each message. The action can be one of the following:INSERT
: If the action is set to INSERT, the flow will always insert records into the destination table.MERGE
If the action is set to MERGE, the flow will INSERT records that do not exist in the destination table and UPDATE existing records.-
CDC MERGE
If the action is set to CDC MERGE the flow will INSERT records that do not exist in the destination table, UPDATE existing records, and DELETE records that were deleted in the source table. CDC MERGE only works when the flow is configured to stream CDC events previously ingested into the queue either by Etlworks or Debezium.
Lookup Fields
:MERGE
action requires a list of columns that uniquely identify the record. By default, the flow will attempt to predict the Lookup Fields by checking unique indexes in the source and destination tables, but if there is no unique index in either table it is not guaranteed that the prediction will be 100% accurate. Use this parameter to define the Lookup Fields in the following format:fully.qualified.table1=field1,field2;fully.qualified.table2=field1,field2
.
The other parameters are similar or the same as for the flow type Bulk load files into database.
Stream messages to Snowflake
This Flow streams messages from one or multiple topics into the designated stage in the local or cloud storage in real-time and periodically (as often as every second) loads the data into the Snowflake in parallel with the stream. It uses a COPY INTO command to load the data.
Prerequisites for Snowflake flow
1. The Snowflake data warehouse is active.
2. The Stage name
is set for the Snowflake connection or Transformation (the latter overrides the stage set for the Connection). Etlworks uses the SnowflakeCOPY INTO
command to load data into Snowflake tables.COPY INTO
requires a named internal or external stage. Stage refers to the location where your data files are stored for loading into Snowflake. Read how Etlworks flow automatically creates the named Snowflake stage.
3. For loading data from the external stage in AWS S3, Azure Blob, or Google Cloud Storage, the Amazon S3 bucket, Google Storage bucket, or Azure blob needs to be created. Note that Etlworks flow does not create the bucket or blob.
Create a flow that streams messages into Snowflake
Step 1. Create and configure source connection for message queue as explained here.
Step 2. Create source JSON format.
We assume that the messages in the queue are stored in JSON format.
Step 3. Create a Snowflake connection that will be used as a final destination.
Step 4. Create a connection for staging data:
- Server storage - for staging files in the local or mapped network storage.
- Amazon S3 - for creating files in S3.
- Azure storage - for creating files in Azure blob storage.
- Google cloud storage - for creating files in Google cloud storage.
If needed, set Archive file before copying to S3
to either GZip
or Zip
. It will enable compression for created files. Compression is supported only for cloud destinations (S3, Azure, Google).
Step 5. Create CSV or JSON format depending on which format you want the flow to create files in.
For CSV format, enable the following properties:
Always enclose
Escape double-quotes
Save Metadata
Step 6. In Flows, select flow type Stream messages from queue into Snowflake
.
Step 7. Configure source-to-destination transformation (left to right):
- From Connection: source connection created in step 1.
- From Format: source JSON format created in step 2.
- From: select one or multiple topics to stream data from.
- To Connection: file storage connection created in step 4.
- To Format: destination format created in step 5.
- TO: the destination table name. It is recommended to set TO as
schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Step 8. Select Connections
tab and select the Snowflake connection created in step 3.
Step 9. Optionally configure mapping
Click theMAPPING
button.
Use the mapping editor to configure the per-field mapping.
Here is what you can do in mapping:
- Disable the column.
- Change the column name.
- Change the column data type when creating a new table.
Step 10. Configure the size of the created files.
Streaming flow creates files of a fixed size. It adds UUID to the name of each created file.
Click MAPPING
and select Parameters
tab.
Set the file size and how the updated file should remain open.
Step 11. Configure load parameters
If needed, modify the following Load parameters:
Load data into Snowflake every (ms)
: by default, the flow loads data into Snowflake every 5 minutes (300000 milliseconds). The load runs in parallel with the stream, which never stops. Decrease this parameter to load data into Snowflake more often or increase it to reduce the number of consumed Snowflake credits.-
Wait (ms) to let running load finish when stream stops
: By default, the flow loads data into Snowflake every 5 minutes. The stream and load are running in parallel, so when streaming stops, the flow executes the load last more time to finish loading the remaining data in the queue. It is possible that the load flow is still running when the stream stops. Use this parameter to configure how long the flow should wait before executing the load last time. Clear this parameter to disable the wait. In this case, if the load task is still running, the flow will finish without executing the load one last time. The flow will load the remaining data in the queue on the next run. -
Action
: The type of SQL which will be executed for each message. The action can be one of the following:INSERT
: If the action is set to INSERT, the flow will always insert records into the destination table.MERGE
If the action is set to MERGE, the flow will INSERT records that do not exist in the destination table and UPDATE existing records.-
CDC MERGE
If the action is set to CDC MERGE the flow will INSERT records that do not exist in the destination table, UPDATE existing records, and DELETE records that were deleted in the source table. CDC MERGE only works when the flow is configured to stream CDC events previously ingested into the queue either by Etlworks or Debezium.
Lookup Fields
:MERGE
action requires a list of columns that uniquely identify the record. By default, the flow will attempt to predict the Lookup Fields by checking unique indexes in the source and destination tables, but if there is no unique index in either table it is not guaranteed that the prediction will be 100% accurate. Use this parameter to define the Lookup Fields in the following format:fully.qualified.table1=field1,field2;fully.qualified.table2=field1,field2
.
The other parameters are similar or the same as for the flow type Bulk load files into Snowflake.
Stream messages to Amazon Redshift
This Flow streams messages from one or multiple topics into the designated stage in S3 in real-time and periodically (as often as every second) loads the data into the Redshift in parallel with the stream. It uses a COPY command to load the data.
Prerequisites for Redshift flow
1. The Redshift is up and running and available from the Internet.
2. The Redshift user hasINSERT
privilege for the table(s).
3. The Amazon S3 bucket is created, and Redshift is able to access the bucket.
Create a flow that streams messages into Redshift
Step 1. Create and configure source connection for message queue as explained here.
Step 2. Create source JSON format.
We assume that the messages in the queue are stored in JSON format.
Step 3. Create a Redshift connection that will be used as a final destination.
Step 4. Create Amazon S3 connection for staging data.
If needed, set Archive file before copying to S3
to either GZip
or Zip
. It will enable compression for created files.
Step 5. Create CSV format.
For CSV format, enable the following properties:
Always enclose
Escape double-quotes
Save Metadata
Step 6. In Flows, select flow type Stream messages from queue to Amazon Redshift
.
Step 7. Configure source-to-destination transformation (left to right):
- From Connection: source connection created in step 1.
- From Format: source JSON format created in step 2.
- From: select one or multiple topics to stream data from.
- To Connection: file storage connection created in step 4.
- To Format: destination format created in step 5.
-
TO: the destination table name. It is recommended to set TO as
schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Step 8. Select Connections
tab and select the Redshift connection created in step 3.
Step 9. Optionally configure mapping
Click theMAPPING
button.
Use the mapping editor to configure the per-field mapping.
Here is what you can do in mapping:
- Disable the column.
- Change the column name.
- Change the column data type when creating a new table.
Step 10. Configure the size of the created files.
Streaming flow creates files of a fixed size. It adds UUID to the name of each created file.
Click MAPPING
and select Parameters
tab.
Set the file size and how the updated file should remain open.
Step 11. Configure load parameters
If needed, modify the following Load parameters:
Load data into Redshift every (ms)
: by default, the flow loads data into Redshift every 5 minutes (300000 milliseconds). The load runs in parallel with the stream, which never stops. Decrease this parameter to load data into Redshift more often or increase it to reduce the number of consumed Redshift credits.Wait (ms) to let running load finish when stream stops
: By default, the flow loads data into Redshift every 5 minutes. The stream and load are running in parallel, so when streaming stops, the flow executes the load last more time to finish loading the remaining data in the queue. It is possible that the load flow is still running when the stream stops. Use this parameter to configure how long the flow should wait before executing the load last time. Clear this parameter to disable the wait. In this case, if the load task is still running, the flow will finish without executing the load one last time. The flow will load the remaining data in the queue on the next run.-
Action
: The type of SQL which will be executed for each message. The action can be one of the following:INSERT
: If the action is set to INSERT, the flow will always insert records into the destination table.MERGE
If the action is set to MERGE, the flow will INSERT records that do not exist in the destination table and UPDATE existing records.-
CDC MERGE
If the action is set to CDC MERGE the flow will INSERT records that do not exist in the destination table, UPDATE existing records, and DELETE records that were deleted in the source table. CDC MERGE only works when the flow is configured to stream CDC events previously ingested into the queue either by Etlworks or Debezium.
-
How to MERGE
: the type of SQL to execute when merging data. The following options are available:Native MERGE
(preview) - execute native Redshift MERGE SQL. Note that MERGE command is in preview;DELETE/INSERT
(default) - DELETE all records in the actual table that also exist in the temp table, then INSERT all records from the temp table into the actual table. Lookup Fields
:MERGE
action requires a list of columns that uniquely identify the record. By default, the flow will attempt to predict the Lookup Fields by checking unique indexes in the source and destination tables, but if there is no unique index in either table it is not guaranteed that the prediction will be 100% accurate. Use this parameter to define the Lookup Fields in the following format:fully.qualified.table1=field1,field2;fully.qualified.table2=field1,field2
.
The other parameters are similar or the same as for the flow type Bulk load files in S3 into Redshift.
Stream messages to Azure Synapse Analytics
This Flow streams messages from one or multiple topics into the designated stage in Azure Storage (Azure Blob) in real-time and periodically (as often as every second) loads the data into the Synapse Analytics in parallel with the stream. It uses a COPY command to load the data.
Prerequisites for Synapse Analytics flow
1. The Azure Synapse Analytics dedicated SQL pool is up and running.
2. The Azure Storage blob is created.
Create a flow that streams messages into Synapse Analytics
Step 1. Create and configure source connection for message queue as explained here.
Step 2. Create source JSON format.
We assume that the messages in the queue are stored in JSON format.
Step 3. Create a Synapse Analytics connection that will be used as a final destination.
Step 4. Create Azure storage connection for staging data.
If needed, set Archive file before copying to S3
to either GZip
or Zip
. It will enable compression for created files.
Step 5. Create CSV format.
For CSV format, enable the following properties:
Always enclose
Escape double-quotes
Save Metadata
Step 6. In Flows, select flow type Stream messages from queue to Azure Synapse Analytics
.
Step 7. Configure source-to-destination transformation (left to right):
- From Connection: source connection created in step 1.
- From Format: source JSON format created in step 2.
- From: select one or multiple topics to stream data from.
- To Connection: file storage connection created in step 4.
- To Format: destination format created in step 5.
- TO: the destination table name. It is recommended to set TO as
db.schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Step 8. Select Connections
tab and select the Synapase Analytics connection created in step 3.
Step 9. Optionally configure mapping
Click theMAPPING
button.
Use the mapping editor to configure the per-field mapping.
Here is what you can do in mapping:
- Disable the column.
- Change the column name.
- Change the column data type when creating a new table.
Step 10. Configure the size of the created files.
Streaming flow creates files of a fixed size. It adds UUID to the name of each created file.
Click MAPPING
and select Parameters
tab.
Set the file size and how the updated file should remain open.
Step 11. Configure load parameters
If needed, modify the following Load parameters:
Load data into Synapse Analytics every (ms)
: by default, the flow loads data into Synapse every 5 minutes (300000 milliseconds). The load runs in parallel with the stream, which never stops. Decrease this parameter to load data into Synapse more often or increase it to reduce the number of consumed Synapse credits.Wait (ms) to let running load finish when stream stops
: By default, the flow loads data into Synapse every 5 minutes. The stream and load are running in parallel, so when streaming stops, the flow executes the load last more time to finish loading the remaining data in the queue. It is possible that the load flow is still running when the stream stops. Use this parameter to configure how long the flow should wait before executing the load last time. Clear this parameter to disable the wait. In this case, if the load task is still running, the flow will finish without executing the load one last time. The flow will load the remaining data in the queue on the next run.-
Action
: The type of SQL which will be executed for each message. The action can be one of the following:INSERT
: If the action is set to INSERT, the flow will always insert records into the destination table.MERGE
If the action is set to MERGE, the flow will INSERT records that do not exist in the destination table and UPDATE existing records.CDC MERGE
If the action is set to CDC MERGE the flow will INSERT records that do not exist in the destination table, UPDATE existing records, and DELETE records that were deleted in the source table. CDC MERGE only works when the flow is configured to stream CDC events previously ingested into the queue either by Etlworks or Debezium.
Lookup Fields
:MERGE
action requires a list of columns that uniquely identify the record. By default, the flow will attempt to predict the Lookup Fields by checking unique indexes in the source and destination tables, but if there is no unique index in either table it is not guaranteed that the prediction will be 100% accurate. Use this parameter to define the Lookup Fields in the following format:fully.qualified.table1=field1,field2;fully.qualified.table2=field1,field2
.
The other parameters are similar or the same as for the flow type Bulk load files in Azure Storage into Synapse Analytics.
Stream messages to Google BigQuery
This Flow streams messages from one or multiple topics into the designated stage in Google Cloud Storage in real-time and periodically (as often as every second) loads the data into the Google BigQuery in parallel with the stream.
Prerequisites for BigQuery flow
1. The Google BigQuery instance must be available from the Internet.
2. The Google cloud storage account exists, and the user has read/write permissions for the specific bucket.
Create a flow that streams messages into BigQuery
Step 1. Create and configure source connection for message queue as explained here.
Step 2. Create source JSON format.
We assume that the messages in the queue are stored in JSON format.
Step 3. Create a Google BigQuery connection that will be used as a final destination.
Step 4. Create a Google Cloud storage connection for staging data.
Step 5. Create CSV format.
For CSV format, enable the following properties:
Always enclose
Escape double-quotes
Save Metadata
Step 6. In Flows, select flow type Stream messages from queue to Google BigQuery
.
Step 7. Configure source-to-destination transformation (left to right):
- From Connection: source connection created in step 1.
- From Format: source JSON format created in step 2.
- From: select one or multiple topics to stream data from.
- To Connection: file storage connection created in step 4.
- To Format: destination format created in step 5.
- TO: the destination table name. It is recommended to set TO as
project.schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Step 8. Select Connections
tab and select the BigQuery connection created in step 3.
Step 9. Optionally configure mapping
Click theMAPPING
button.
Use the mapping editor to configure the per-field mapping.
Here is what you can do in mapping:
- Disable the column.
- Change the column name.
- Change the column data type when creating a new table.
Step 10. Configure the size of the created files.
Streaming flow creates files of a fixed size. It adds UUID to the name of each created file.
Click MAPPING
and select Parameters
tab.
Set the file size and how the updated file should remain open.
Step 11. Configure load parameters
If needed, modify the following Load parameters:
Load data into BigQuery every (ms)
: by default, the flow loads data into BigQuery every 5 minutes (300000 milliseconds). The load runs in parallel with the stream, which never stops. Decrease this parameter to load data into BigQuery more often or increase it to reduce the number of consumed BigQuery credits.Wait (ms) to let running load finish when stream stops
: By default, the flow loads data into BigQuery every 5 minutes. The stream and load are running in parallel, so when streaming stops, the flow executes the load last more time to finish loading the remaining data in the queue. It is possible that the load flow is still running when the stream stops. Use this parameter to configure how long the flow should wait before executing the load last time. Clear this parameter to disable the wait. In this case, if the load task is still running, the flow will finish without executing the load one last time. The flow will load the remaining data in the queue on the next run.-
Action
: The type of SQL which will be executed for each message. The action can be one of the following:INSERT
: If the action is set to INSERT, the flow will always insert records into the destination table.MERGE
If the action is set to MERGE, the flow will INSERT records that do not exist in the destination table and UPDATE existing records.-
CDC MERGE
If the action is set to CDC MERGE the flow will INSERT records that do not exist in the destination table, UPDATE existing records, and DELETE records that were deleted in the source table. CDC MERGE only works when the flow is configured to stream CDC events previously ingested into the queue either by Etlworks or Debezium.
Lookup Fields
:MERGE
action requires a list of columns that uniquely identify the record. By default, the flow will attempt to predict the Lookup Fields by checking unique indexes in the source and destination tables, but if there is no unique index in either table it is not guaranteed that the prediction will be 100% accurate. Use this parameter to define the Lookup Fields in the following format:fully.qualified.table1=field1,field2;fully.qualified.table2=field1,field2
.
The other parameters are similar or the same as for the flow type Bulk load files in Google Cloud Storage into BigQuery.
Stream messages to Greenplum
This Flow streams messages from one or multiple topics into the designated server storage location in real-time and periodically (as often as every second) loads the data into Greenplum in parallel with the stream.
Prerequisites for Greenplum flow
1. The Greenplum instance must be available from the Internet.
2. The gpload utility must be installed on the same VM as Etlworks. Contact Etlworks support atsupport@etlworks.com
if you need assistance installing the gpload.
Create a flow that streams messages into Greenplum
Step 1. Create and configure source connection for message queue as explained here.
Step 2. Create source JSON format.
We assume that the messages in the queue are stored in JSON format.
Step 3. Create a Greenplum connection that will be used as a final destination.
Step 4. Create a Server storage connection for staging data.
Step 5. Create CSV format.
For CSV format, enable the following properties:
Always enclose
Escape double-quotes
Save Metadata
Step 6. In Flows, select flow type Stream messages from queue to Greenplum
.
Step 7. Configure source-to-destination transformation (left to right):
- From Connection: source connection created in step 1.
- From Format: source JSON format created in step 2.
- From: select one or multiple topics to stream data from.
- To Connection: file storage connection created in step 4.
- To Format: destination format created in step 5.
- TO: the destination table name. It is recommended to set TO as
schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Step 8. Select Connections
tab and select the Greenplum connection created in step 3.
Step 9. Optionally configure mapping
Click theMAPPING
button.
Use the mapping editor to configure the per-field mapping.
Here is what you can do in mapping:
- Disable the column.
- Change the column name.
- Change the column data type when creating a new table.
Step 10. Configure the size of the created files.
Streaming flow creates files of a fixed size. It adds UUID to the name of each created file.
Click MAPPING
and select Parameters
tab.
Set the file size and how the updated file should remain open.
Step 11. Configure load parameters
If needed, modify the following Load parameters:
Load data into Greenplum every (ms)
: by default, the flow loads data into Greenplum every 5 minutes (300000 milliseconds). The load runs in parallel with the stream, which never stops. Decrease this parameter to load data into Greenplum more often.Wait (ms) to let running load finish when stream stops
: By default, the flow loads data into Greenplum every 5 minutes. The stream and load are running in parallel, so when streaming stops, the flow executes the load last more time to finish loading the remaining data in the queue. It is possible that the load flow is still running when the stream stops. Use this parameter to configure how long the flow should wait before executing the load last time. Clear this parameter to disable the wait. In this case, if the load task is still running, the flow will finish without executing the load one last time. The flow will load the remaining data in the queue on the next run.-
Action
: The type of SQL which will be executed for each message. The action can be one of the following:INSERT
: If the action is set to INSERT, the flow will always insert records into the destination table.MERGE
If the action is set to MERGE, the flow will INSERT records that do not exist in the destination table and UPDATE existing records.CDC MERGE
If the action is set to CDC MERGE the flow will INSERT records that do not exist in the destination table, UPDATE existing records, and DELETE records that were deleted in the source table. CDC MERGE only works when the flow is configured to stream CDC events previously ingested into the queue either by Etlworks or Debezium.
Lookup Fields
:MERGE
action requires a list of columns that uniquely identify the record. By default, the flow will attempt to predict the Lookup Fields by checking unique indexes in the source and destination tables, but if there is no unique index in either table it is not guaranteed that the prediction will be 100% accurate. Use this parameter to define the Lookup Fields in the following format:fully.qualified.table1=field1,field2;fully.qualified.table2=field1,field2
.
The other parameters are similar or the same as for the flow type Bulk load files into Greenplum.
Stream messages to Vertica
This Flow streams messages from one or multiple topics into the designated stage in the server or cloud file store in real-time and periodically (as often as every second) loads the data into Vertica in parallel with the stream. It uses a COPY command to load the data.
Prerequisites for Vertica flow
1. Vertica database is up and running and accessible from the Etlworks instance. Read how to work with on-premise data in Etlworks.
Create a flow that streams messages into Vertica
Step 1. Create and configure source connection for message queue as explained here.
Step 2. Create source JSON format.
We assume that the messages in the queue are stored in JSON format.
Step 3. Create a Vertica connection that will be used as a final destination.
It is recommended to enable the auto-commit for the destination connection.
Step 4. Create a connection for staging data:
- Server storage - for staging files in the local or mapped network storage.
- Amazon S3 - for creating files in S3.
- Azure storage - for creating files in Azure blob storage.
- Google cloud storage - for creating files in Google cloud storage.
If needed, set Archive file before copying to S3
to either GZip
or Zip
. It will enable compression for created files. Compression is supported only for cloud destinations (S3, Azure, Google).
If needed, set Archive file before copying to S3
to either GZip
or Zip
. It will enable compression for created files.
Step 5. Create CSV format.
For CSV format, enable the following properties:
Always enclose
Escape double-quotes
Save Metadata
Step 6. In Flows, select flow type Stream messages from queue to Vertica
.
Step 7. Configure source-to-destination transformation (left to right):
- From Connection: source connection created in step 1.
- From Format: source JSON format created in step 2.
- From: select one or multiple topics to stream data from.
- To Connection: file storage connection created in step 4.
- To Format: destination format created in step 5.
- TO: the destination table name. It is recommended to set TO as
schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Step 8. Select Connections
tab and select the Vertica connection created in step 3.
Step 9. Optionally configure mapping
Click theMAPPING
button.
Use the mapping editor to configure the per-field mapping.
Here is what you can do in mapping:
- Disable the column.
- Change the column name.
- Change the column data type when creating a new table.
Step 10. Configure the size of the created files.
Streaming flow creates files of a fixed size. It adds UUID to the name of each created file.
Click MAPPING
and select Parameters
tab.
Set the file size and how the updated file should remain open.
Step 11. Configure load parameters
If needed, modify the following Load parameters:
Load data into Vertica every (ms)
: by default, the flow loads data into Vertica every 5 minutes (300000 milliseconds). The load runs in parallel with the stream, which never stops. Decrease this parameter to load data into Vertica more often or increase it to reduce the number of consumed database credits.Wait (ms) to let running load finish when stream stops
: By default, the flow loads data into Vertica every 5 minutes. The stream and load are running in parallel, so when streaming stops, the flow executes the load last more time to finish loading the remaining data in the queue. It is possible that the load flow is still running when the stream stops. Use this parameter to configure how long the flow should wait before executing the load last time. Clear this parameter to disable the wait. In this case, if the load task is still running, the flow will finish without executing the load one last time. The flow will load the remaining data in the queue on the next run.-
Bulk Load SQL
: the Bulk Load SQL is used to load files in the cloud or file storage into the staging tables in Vertica database. This is a required parameter.By default, we assume that the flow bulk-loads from the local (server) storage using
COPY FROM LOCAL
command.COPY {TABLE} FROM LOCAL '{FULL_FILE_NAME}'
PARSER fcsvparser(header='true') ABORT ON ERROR;You can modify the SQL statement that will be used to bulk-load data into the destination table. The following
{tokens}
can be used as a part of the SQL statement:-
{TABLE}
: the table to load data into. -
{TEMP_TABLE}
: the staging table name used for MERGE. -
{FILE_TO_LOAD}
: the file name to load without path and extension. -
PATH
: the path of the file to load without file name, for example{app.data}/test/
. -
EXT
: the file extension of the file to load, without.
, for example,csv
.
-
-
Action
: The type of SQL which will be executed for each message. The action can be one of the following:INSERT
: If the action is set to INSERT, the flow will always insert records into the destination table.MERGE
If the action is set to MERGE, the flow will INSERT records that do not exist in the destination table and UPDATE existing records.CDC MERGE
If the action is set to CDC MERGE the flow will INSERT records that do not exist in the destination table, UPDATE existing records, and DELETE records that were deleted in the source table. CDC MERGE only works when the flow is configured to stream CDC events previously ingested into the queue either by Etlworks or Debezium.
Lookup Fields
:MERGE
action requires a list of columns that uniquely identify the record. By default, the flow will attempt to predict the Lookup Fields by checking unique indexes in the source and destination tables, but if there is no unique index in either table it is not guaranteed that the prediction will be 100% accurate. Use this parameter to define the Lookup Fields in the following format:fully.qualified.table1=field1,field2;fully.qualified.table2=field1,field2
.
The other parameters are similar or the same as for the flow type Bulk load files into Vertica.
Stream CDC events from a message queue
Etlworks includes a dedicated Flow type for streaming CDC events into the message queue.
The Flow described in this article streams CDC events stored in a message queue into any supported destination.
Configuration
Step 1. When configuring the CDC connection for CDC extract flow make sure Structure of CDC event
includes the following blocks. They are needed to properly extract metadata, such as column names and data types, from the CDC events stored in a message queue:
Step 2. Create and configure source connection for message queue as explained here.
Step 3. When configuring a source connection, make sure Integration with CDC provider
is set to Etlworks
.
Step 4. Create a streaming flow for any of the following destinations:
- File storage.
- Relational database.
- Database which supports the bulk load.
- Snowflake.
- Amazon Redshift.
- Google BigQuery.
- Azure Synapse Analytics.
- Greenplum.
- Vertica.
Step 5. Configure TO (destination).
The TO depends on the type of destination.
File storage
The TO is a destination filename.
The TO in the streaming transformation can include any of the [tokens] below:
[table]
- the source table name.[db]
- the source database name.[schema]
- the source schema.*
- the source topic name.
The flow substitutes [tokens] on the values of the [tokens].
This example demonstrates how to configure a transformation to create a file that includes the source database and table name: [db]_[table].csv
.
Read how to set the destination name using JavaScript.
Relational database
The TO is a destination table name.
The TO in the streaming transformation can include any of the [tokens] below:
[table]
- the source table name.[db]
- the source database name.[schema]
- the source schema.*
- the source topic name.
The flow substitutes [tokens] on the values of the [tokens].
This example demonstrates how to configure a transformation load data into the table with the same name as the source table: publlic.[table]
.
Read how to set the destination name using JavaScript.
Database which supports the bulk load
The TO is a destination table name.
It is recommended to set TO as db.schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Snowflake
The TO is the destination table name.
It is recommended to set TO as schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Amazon Redshift
The TO is the destination table name.
It is recommended to set TO as schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Google BigQuery
The TO is the destination table name.
It is recommended to set TO as project.schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Azure Synapse Analytics
The To is the destination table name.
It is recommended to set TO as db.schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Greenplum
The TO is the destination table name.
It is recommended to set TO as schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Vertica
The TO is the destination table name.
It is recommended to set TO as schema.*
. In this case, the flow will load data into the table with the same (or modified) name as the source table or topic. Read how to set the destination name using JavaScript.
Step 6. Optionally, if you wish the MERGE (UPSERT) the data into the destination table, set Action
to CDC MERGE
when configuring the streaming flow. If the action is set to CDC MERGE the flow will INSERT records that do not exist in the destination table, UPDATE existing records, and DELETE records that were deleted in the source table.
Real-time change replication with Kafka, Debezium, and Etlworks
This tutorial demonstrates how to implement [near] real-time CDC-based change replication for the most popular databases using the following technologies:
- Native CDC for each source database.
- Apache Kafka.
- Debezium.
- Etlworks Kafka or Azure Event Hubs connector with built-in support for Debezium.
- Streaming flow that supports any of the following destinations:
Tips and tricks
Schedule streaming flow
We recommend using a continuous run Schedule type. The idea is that the streaming Flow runs indefinitely or until there are no messages to extract, or it stops if there is an error. When the Flow stops for any of the reasons above, it restarts automatically after the configured number of seconds.
Modify the source message and set the destination name using JavaScript
Use parameter Consumer Preprocessor
available for Kafka, Azure Event Hubs and Google PubSub connectors to modify the source message and/or destination name.
Available scripting languages
- JavaScript
Available variables
- event: the message serialized as JsonNode.
- fields: the source fields serialized as JsonNode.
- topic: the Kafka, Event Hubs or Google PubSub topic.
- destination: the destination name.
- db - the source database name.
- schema - the source schema name.
Get the values of the column
var val = event.get('columns_name').asText();
Modify the value of the column
Using this technique you can modify the values of the existing columns and add new columns to the stream.
event.put('columns_name', 'string');
Possible return values
-
java.lang.Boolean.TRUE
/FALSE
: if FALSE, the message will be skipped. -
TypedKeyValue<String, Boolean>
: the key/value pair,w where the key contains the destination name and the value is java.lang.Boolean.TRUE/FALSE. - Anything else is ignored.
Set the destination name
By default, the destination table name or a filename is calculated using a template set in TO where any part of the TO can contain any of the following [tokens]:
[table]
- the source table name.[db]
- the source database name.[schema]
- the source schema.- * - the source topic name.
The flow substitutes [tokens] on the values of the [tokens].
This example demonstrates how to configure a transformation to load data into the destination table in the public schema and with the same name as a source table: public.[table]
.
The developer can also set the destination name using JavaScript. Simply return (assign it to variable value
in the last line of the Consumer Preprocessor script) the instance of TypedKeyValue class, where the key = the new destination name.
Here is an example:
var topic.split('\\.', -1);
// assuming that the topic name includes database.schema.table and we only need table name
value = new TypedKeyValue('public.' + topic[2], java.lang.Boolean.TRUE);
Monitor streaming Flows
Our ETL engine records real-time metrics for each active stream. The following metrics are available:
-
Last Update(timezone
: the time stamp in the server timezone of the last checkpoint. The metrics are updated every 60 seconds. -
Last Record Processed (time ago)
: the human-readable timestamp of the last processed message. -
Messages/sec
: the number of messages processed in one second recorded during the last checkpoint interval. -
Last recorderd Lag
: how many records are queued to be processed. -
Max Lag
: the maximum lag recorded since the connector has started streaming. -
Total Records Processed
: the total number of messages processed since the last restart of the connector. -
Records Processed Since Last Update
: the number of messages processed since the last update. Note that the metrics are updated every 60 seconds.
Access stream Metrics from UI
To access stream metrics from UI go to Flows
or Schedules
and click Running
next to the Flow or Schedule name.
Then click View Running Tasks
.
The metrics are displayed at the bottom of the Running Tasks
window.
You can refresh the metrics manually:
or enable Auto refresh
.
Access stream metrics using API
GET stream metrics for the specific Flow
This API returns metrics for the specific streaming Flow.
Authentication
Before making a call to API, you must receive a JWT token from the authentication API.
Step 1. Use any user with the Administrator
role to call Etlworks authentication endpoint and receive an access token.
Step 2. Use the access token, received in Step 1, to call Etlworks API endpoints. The access token must be submitted as a header parameter, as in: Authorization:Bearer access-token
.
Access tokens are short-lived and self-expiring. An access token gives you access to the APIs for approximately 10 minutes. It is recommended that you refresh the access token before each call to the API.
The API endpoint parameters:
-
PATH:
/etl/rest/v1/tasks/{audit_id}/?type=Kafka%20stream
EXAMPLE: https://app.etlworks.com/etl/rest/v1/tasks/80610/?type=Kafka%20stream
-
METHOD:
GET
-
HEADER:
Authorization:Bearer access-token
- REQUEST BODY: none
- REQUEST CONTENT TYPE: none
Audit ID
audit_id
is a required URL parameter. To get the audit_id
for the specific Flow, use the Flow status API.
Response
The response is a JSON document in the following Format. The highlighted are stream metrics.
[{
"requestId": "string",
"owner": "string",
"name": "string",
"started": timestamp_in_milliseconds_since_epoch,
"duration": number,
"code": "the formatted CDC metrics",
"type": "CDC",
"tenant": "cdc_prod",
"language": null,
"messagesPerSecond": messages_per_second,
"lag": last_recorded_lag,
"maxLag": max_recorded_lag,
"latency": number,
"maxLatency": number,
"minLatency": number,
"avgLatency": number,
"maxLatencyDate": timestamp_in_milliseconds_since_epoch,
"recordsProcessed": number,
"recordsProcessedSinceLastCheck": number,
"lastCheck": timestamp_in_milliseconds_since_epoch,
"lastTimeRecordsReceivedDate": timestamp_in_milliseconds_since_epoch}
]
Response codes
200
for success, 401
for not authorized, 403
for forbidden, and 500
for an internal error.
GET stream metrics for all running Flows
This API returns metrics for all currently running stream Flows.
Authentication
Before making a call to API, the user must receive a JWT token from the authentication API.
Step 1. Use any user with the Administrator
role to call Etlworks authentication endpoint and receive an access token.
Step 2. Use the access token, received in Step 1, to call Etlworks API endpoints. The access token must be submitted as a header parameter, as in: Authorization:Bearer access-token
.
Access tokens are short-lived and self-expiring. An access token gives you access to the APIs for approximately 10 minutes. It is recommended that you refresh the access token before each call to the API.
The API endpoint parameters:
-
PATH:
/etl/rest/v1/tasks/?type=Kafka%20stream
EXAMPLE: https://app.etlworks.com/etl/rest/v1/tasks/?type=Kafka%20stream
-
METHOD:
GET
-
HEADER:
Authorization:Bearer access-token
- REQUEST BODY: none
- REQUEST CONTENT TYPE: none
Response
The response is a JSON document in the following Format. The highlighted are stream metrics.
[{
"requestId": "string",
"owner": "string",
"name": "string",
"started": timestamp_in_milliseconds_since_epoch,
"duration": number,
"code": "the formatted CDC metrics",
"type": "CDC",
"tenant": "cdc_prod",
"language": null,
"messagesPerSecond": messages_per_second,
"lag": last_recorded_lag,
"maxLag": max_recorded_lag,
"latency": number,
"maxLatency": number,
"minLatency": number,
"avgLatency": number,
"maxLatencyDate": timestamp_in_milliseconds_since_epoch,
"recordsProcessed": number,
"recordsProcessedSinceLastCheck": number,
"lastCheck": timestamp_in_milliseconds_since_epoch,
"lastTimeRecordsReceivedDate": timestamp_in_milliseconds_since_epoch}
]
Response codes
200
for success, 401
for not authorized, 403
for forbidden, and 500
for an internal error.
Comments
0 comments
Please sign in to leave a comment.