- Startup
- Business
- Enterprise
- On-Premise
- Add-on
About MQTT
MQTT (Message Queuing Telemetry Transport) is a lightweight, publish-subscribe messaging protocol designed for resource-constrained environments. It’s widely used in IoT (Internet of Things) applications due to its efficient data exchange with low bandwidth requirements. MQTT operates on top of TCP/IP and ensures reliable communication even with unstable network conditions. Its architecture includes a client-server model, where clients (publishers and subscribers) interact with a central broker to send and receive messages on specific topics. This makes it ideal for scenarios requiring minimal data overhead and reliable message delivery, such as sensor data collection, real-time monitoring, and remote device control.
Read more about MQTT.
When to use this connector
- to read messages from MQTT topics and write messages to MQTT topics.
- to stream messages from the queue to various destinations.
- to implement a log-based CDC with a message queue.
Create a Connection
Step 1. In the Connections
window, click +
, and type in mqtt
.
Step 2. Select MQTT
.
Step 3. Enter the Connection parameters.
Connection parameters
Connection Settings
-
URL: Enter the MQTT broker’s URL. This is a required field. Make sure the URL includes a host name and port number in the format
host:port
. You can optionally add a protocol such asmqtt://
. The connector supportsmqtt://
,mqtts://
,ssl://
,tcp://
. - Topic(s): Specify one or more MQTT topics to subscribe to or publish messages.
- Client ID: The unique identifier for your client on the MQTT broker. Required.
- Quality of Service (QoS): Define the QoS level for message delivery: “0 - At Most Once”, “1 - At Least Once”, or “2 - Exactly Once.”
- MQTT Version: Select the MQTT version (e.g., 3, 5).
- Will Topic: Specify the last will topic, which is sent by the broker if the client disconnects unexpectedly.
Authentication parameters
- Authentication Type: Choose between “Username and Password” or “Certificates and Private Key.” When certificates are selected, provide the relevant certificate and private key files.
- Enable Encryption (TLS): Enable TLS encryption for secure communication.
- Trust all hostnames: If enabled, all hostnames will be trusted, even if they don’t match the certificate’s subject.
Timeouts and Keep Alive
- Connection Timeout (ms): Set the time before the connection times out.
- Keep-Alive Timeout/Interval (seconds): Define the keep-alive interval, ensuring the client is alive.
- Max Failed Probes: Number of failed pings before the connection is considered lost.
Consumer Parameters
- Max number of records to read: Maximum records to read per poll.
- Number of retries before stop polling: Number of attempts to retry polling after failure.
- Retry N minutes before stop polling: Time to wait before retrying polling.
- Wait to get a next message (ms): Time to wait between polling attempts.
- Process remaining messages: If enabled, processes all remaining messages in the queue before stopping the connector.
- Consumer Preprocessor: Script to preprocess messages before consumption.
Producer Parameters
- Value Serializer: The format for serializing outgoing messages (e.g., String, Binary array).
- Producer Preprocessor: Script to preprocess messages before production
CDC (Change Data Capture)
- Enable integration with a CDC provider for real-time data processing.
Encoding and Decoding
- Compression: Choose the message compression format (e.g., none, gzip).
- Encoding: Specify how messages should be encoded before publishing (none, Base64, Sparkplug B, etc.).
Authentication
The MQTT connector provides two authentication methods:
Username and Password Authentication:
Enter your MQTT broker’s username and password. This is a simple, widely-used method for securing your connection.
Certificates and Private Key Authentication:
Use certificates for enhanced security. Provide a Root Certificate, Client Certificate, and Private Key. This method is more secure and typically used for environments requiring TLS encryption.
Additional security options
- Enable Encryption (TLS): Encrypt data transmission using Transport Layer Security.
- Trust All Hostnames: Bypass hostname verification when enabled.
How to use MQTT connector
The MQTT connector operates similarly to other message queue connectors available in Etlworks. It facilitates sending and receiving messages to and from MQTT topics, offering reliable message exchange with flexible configuration options. While the underlying protocol differs, the functionality—such as topic subscriptions, message polling, and acknowledgments—works seamlessly within Etlworks’ flow-based design, just like other supported queue connectors (e.g., Kafka, ActiveMQ). The connector supports secure authentication, message processing, and flow integration, making it versatile for various messaging needs.
Read messages from MQTT topics
Step 1. Create a source Connection for a specific MQTT broker. Enter unique Client ID:
Most MQTT brokers only allow one client to connect with a specific client ID at any given time. If another client using the same ID connects to the broker and subscribes to topics, the broker may terminate the session of the previously connected client. To avoid connection issues, ensure that each client uses a unique ID, especially when running multiple clients simultaneously.
Step 3. If messages are stored in the text format set Value Serializer to String. Otherwise set to to Array of Bytes.
Step 4. Optionally configure encoding and compression.
Step 5. Create a source Format. The following Formats are supported when reading messages from the queue:
- JSON
- CSV
- Byte Array (read how to send unmodified messages from one MQTT broker to another)
Step 6. Create a destination Connection, for example, a Connection to the relational database.
Step 7. Optionally, create a destination Format.
Step 8. Create a Flow where the source is a message queue by typing in queue to
in the Flow Selector
popup:
Step 9. Continue by adding source-to-destination transformations where the source is MQTT Connection created in step 1, source Format created in step 5, and the destination Connection and (optionally) Format created in steps 6 and 7.
Step 10. When configuring source-to-destination transformation, enter the MQTT topic name in the FROM
field. Etlworks supports wildcard topic names, such as test/device/#
. You can enter multiple source topics (including wildcard topics) as a comma-separated list: topic1,topic2,etc
Step 8. Schedule the Flow to stream data in real-time or to be executed periodically.
When scheduling the Flow to be executed periodically, it is recommended to use short intervals for micro-batching.
Write messages to MQTT topics
In this scenario, the data is Extracted from any source, Transformed, and Loaded into a MQTT topics using a configurable data exchange format.
Steps to create an ETL flow to write messages to MQTT:
Step 1. Create a source Connection.
Step 2. Optionally create a source Format.
Step 3. Create a destination Connection for the message queue to write the messages to MQTT.
Step 4. Create a destination Format. The following Formats are supported when writing messages to the queue:
- JSON
- Avro
- Byte Array (read how to send unmodified messages from one MQTT broker to another)
Step 5. Create a Flow where the destination is a message queue by typing in to queue
in the Flow selector
popup:
Step 6. Continue by adding source-to-destination transformations where the source Connection and Format are Connection and Format created in steps 1 and 2, and the destination Connection and Format are Connection and Format created in steps 3 and 4.
Enter the destination topic name in the “TO” field. You can configure multiple source-to-destination transformations within the same flow, each with a different MQTT topic as the destination. This allows you to send transformed data to various MQTT topics as part of a single ETL flow, streamlining message distribution across different topics.
Step 7. Schedule the Flow to be executed periodically.
Send unmodified messages from one MQTT broker to another
In some scenarios, an organization may want to transfer messages directly from one MQTT broker to another without any changes. This use case is common when:
- A company is migrating from one broker to another and needs to maintain message continuity.
- A system requires message distribution to different networks or regions, where each network has its own broker.
- Integrating two separate applications that rely on different brokers but need to share data.
This setup ensures seamless message forwarding without modification.
Step 1. Create a connection to the source MQTT broker. Enter unique Client ID.
Step 2. To stream data without stoping the flow clear these three parameters.
Step 3. Set Value Serializer to Array of Bytes. Make sure there is no compression and encoding.
Step 4. Create a connection to the destination MQTT broker. Set Value Serializer to Array of Bytes. Make sure there is no compression and encoding.
Step 5. Create Byte Array format.
Step 6. Create new Queue to Queue flow.
Step 7. Add a source-to-destination transformation where source and destination connections are connections created in steps 1-3 and 4 and source and destination format is a Byte Array format created in step 5.
When configuring source-to-destination transformation, enter the MQTT topic name in the FROM
field. Etlworks supports wildcard topic names, such as test/device/#
. To load data into the same topics as in the source enter *
in TO.
Subscriber preprocessor
Use Subscriber Preprocessor
to modify the source message and/or destination name.
Available scripting languages
- JavaScript
Available variables
- event: the message serialized as JsonNode.
- fields: the source fields serialized as JsonNode.
- topic: the Google Pub/Sub topic.
- destination: the destination name.
- db - the source database name.
- schema - the source schema name.
Get the values of the column
var val = event.get('columns_name').asText();
Modify the value of the column
Using this technique you can modify the values of the existing columns and add new columns to the stream.
event.put('columns_name', 'string');
Possible return values
-
java.lang.Boolean.TRUE
/FALSE
: if FALSE, the message will be skipped. -
TypedKeyValue<String, Boolean>
: the key/value pair, where the key contains the destination name and the value is java.lang.Boolean.TRUE/FALSE. - Anything else is ignored.
Set the destination name
By default, the destination table name or a filename is calculated using a template set in TO where any part of the TO can contain any of the following [tokens]:
[table]
- the source table name.[db]
- the source database name.[schema]
- the source schema.- * - the source topic name.
The flow substitutes [tokens] on the values of the [tokens].
The developer can also set the destination name using JavaScript. Simply return (assign it to variable value
in the last line of the Consumer Preprocessor script) the instance of TypedKeyValue class, where the key = the new destination name.
Here is an example:
var topic.split('\\.', -1);
// assuming that the topic name includes database.schema.table and we only need table name
value = new TypedKeyValue('public.' + topic[2], java.lang.Boolean.TRUE);
Publisher Preprocessor
Use Publisher Preprocessor
to modify the message and optionally order key which will be added to the topic.
Scripting languages
- JavaScript
Variables
- message: the original message, serialized as String or ByteArrayStream.
- producerPackage: the instance of BaseProducerPackage.
- topic: the topic name.
Return values
- value = modified_message - the modified message
- value = null - the messages will not be added to the topic.
-
TypedKeyValue<String, String>
: the key/value pair, where the key contains the order key and the value is a message.
Comments
0 comments
Please sign in to leave a comment.