- Startup
- Business
- Enterprise
- On-Premise
- Add-on
About Azure Event Hubs
Event Hubs is a fully managed, real-time data ingestion service that’s simple, trusted, and scalable. Read more about Event Hubs.
When to use this connector
- to read messages from and write messages to a given Event Hubs topic(s).
- to stream messages from the queue to various destinations.
- to implement a log-based CDC with a message queue.
- to implement a real-time change replication with Kafka and Demezium.
Create Kafka-enabled Event Hubs
On API level Azure Event Hubs is compatible with Apache Kafka.
Read how to create Kafka-enabled Azure event hubs.
Create a Connection
Step 1. In the Connections
window, click +
, and type in azure event hub
.
Step 2. Select Azure Event Hub
.
Step 3. Enter the Connection parameters.
Connection parameters
-
Namespace
: the Event Hubs namespace. -
Topic(s)
: a topic to read messages from or write messages to. For reading, the wildcard topic names, for example,inbound.*
or comma-separated topic names, for example,topic1,topic2
topics are supported. Event Hub is a synonym to Topic. -
Access Key
: the Event Hub access key can be found in the Azure console underEvent Hub
/Shared acccess policies
/SAS Policy
/ Connection string-primary key. -
Properties
: the additional properties for the Kafka consumer, Kafka producer, and Kafka security. The properties must be in a Formatkey1=value;key1=value1
. -
Auto Commit
: if enabled, the Kafka consumer will periodically commit the offset when reading the messages from the queue. It is recommended to keep it disabled so the system can commit the offset right after the messages have been processed. -
Starting Offset
: a starting offset at which to begin the fetch. -
Key Deserializer
: the deserializer for the key. -
Value Deserializer
: the deserializer for the value. When the value is a document in Avro Format use eitherAvro
(when processing messages enqueued by Etlworks), orAvro Record
(when processing messages enqueued by the third-party application). The latter requires anAvro Schema
. -
Max number of records to read
: the total maximum number of records to read in one micro-batch. The default limit is 1000000. -
Poll duration
: how long (in milliseconds) the consumer should wait while fetching the data from the queue. The default is 1000 milliseconds. -
Max number of records to poll
: the maximum number of records which can be fetched from the queue in a single poll call. -
Number of retries before stop polling
: the number of retries before stop polling if the poll returns no records. The default is 5. -
Consumer Preprocessor
: a program in JavaScript that can be used to modify the source message or the destination name. Read more. -
Integration with CDC providers
: the CDC provider. Select either Etlworks or Debezium if you are planning to use this connection for processing CDC events created by ETL CDC connectors or Debezium. -
Key Serializer
: the serializer for the key. -
Value Serializer
: the serializer for the value. Use Avro when writing messages in Avro Format. -
Compression
: the compression algorithm used when writing messages. -
Record headers
: record headers are key-value pairs that allow you to add some metadata about the Kafka record without adding any extra information to the key/value pair of the record itself. -
Producer Preprocessor
: a program in JavaScript that can be used to modify the message which will be added to the topic. Read more.
Consumer preprocessor
Use Consumer Preprocessor
to modify the source message and/or destination name.
Available scripting languages
- JavaScript
Available variables
- event: the message serialized as JsonNode.
- fields: the source fields serialized as JsonNode.
- topic: the Event Hubs topic.
- destination: the destination name.
- db - the source database name.
- schema - the source schema name.
- other - the reference to the array of record headers. Use the following code to access the headers:
var headers = other.toArray();
for (var i = 0; id < headers.length; i++) {
var header = headers[i];
var key = header.key();
var value = new java.lang.String(header.value());
}
Get the values of the column
var val = event.get('columns_name').asText();
Modify the value of the column
Using this technique you can modify the values of the existing columns and add new columns to the stream.
event.put('columns_name', 'string');
Possible return values
-
java.lang.Boolean.TRUE
/FALSE
: if FALSE, the message will be skipped. -
TypedKeyValue<String, Boolean>
: the key/value pair,w where the key contains the destination name and the value is java.lang.Boolean.TRUE/FALSE. - Anything else is ignored.
Set the destination name
By default, the destination table name or a filename is calculated using a template set in TO where any part of the TO can contain any of the following [tokens]:
[table]
- the source table name.[db]
- the source database name.[schema]
- the source schema.- * - the source topic name.
The flow substitutes [tokens] on the values of the [tokens].
This example demonstrates how to configure a transformation to load data into the destination table in the public schema and with the same name as a source table: public.[table]
.
The developer can also set the destination name using JavaScript. Simply return (assign it to variable value
in the last line of the Consumer Preprocessor script) the instance of TypedKeyValue class, where the key = the new destination name.
Here is an example:
var topic.split('\\.', -1);
// assuming that the topic name includes database.schema.table and we only need table name
value = new TypedKeyValue('public.' + topic[2], java.lang.Boolean.TRUE);
Producer Preprocessor
Use Producer Preprocessor
to modify the message which will be added to the topic.
Scripting languages
- JavaScript
Variables
- message: the original message, serialized as String or ByteArrayStream.
- producerPackage: the instance of BaseProducerPackage.
- topic: the topic name.
Return values
- value = modified_message - the modified message
- value = null - the messages will not be added to the topic.
Comments
0 comments
Please sign in to leave a comment.