Problem
A DevOps team in an e-commerce company needs to save all email notifications emitted by the Amazon SNS service into the database for future analysis. Specifically, they want to know the rate of failures, what notifications have bounced, etc.
Requirements
- SNS can forward all notifications to the specific HTTP endpoint (which must support basic authentication), so it should be possible to create an ETL Flow which is exposed as an HTTP endpoint with basic authentication.
- Once the notification is received, (in the nested JSON Format) it must be parsed, flattened, and loaded into the PostgreSQL database in Amazon RDS.
- There are fields which must be calculated using values of other fields.
- The notifications can be emitted as often as every second so the Flow must support a very high frequency of executions (almost real-time).
Prerequisites
The SNS must be configured to send HTTP notifications to the subscribers.
Solution
The Flow was built using a future called user-defined PUSH API. The PUSH API with a custom endpoint can receive a payload from the HTTP request, transform it, and load it into the destination.
Step 1. Create a database Connection to the PostgreSQL database.
Step 2. Create a JSON Format with all default settings.
Step 3. Create an HTTP Listener with the following parameters:
-
URL Pattern
:/mailing
-
Method
:POST
-
Auth Type
:Basic
-
Step 4. Configure SNS to send notifications to the https://etlworks-base-url/plugins/schedules/rest/v1/httplistener/mail
with a basic auth, using credentials of any valid Etlworks Integrator user who can run the Flows.
Step 5. Create a new Flow by selecting the Flow type File to database
from the gallery.
Step 6. Add a new source-to-destination transformation and set the following parameters:
-
Connection (
FROM
): the HTTP Listener Connection created in Step 3. -
Format (
FROM
): the JSON Format created in Step 2. -
TO
: the name of the table in the PostgreSQL database. -
Connection (
TO
): the PostgreSQL Connection created in Step 1.
Step 7. Click MAPPING
and configure a per-field Mapping like the following:
Step 8. Values for the fields headers
, response
, and error
must be calculated of the fly:
-
headers
: the value of this field is a stringified JSON with headers -
var javaImports = new JavaImporter(com.toolsverse.etl.common,
com.toolsverse.etl.connector, com.toolsverse.util.log); with (javaImports) { var ds = {headers}; var field = ""; try { field = ConnectorUtils.dataSet2Str(ds, "com.toolsverse.etl.connector.json.JsonConnector", null); } catch (e) { Logger.log(Logger.INFO, null, "headers:" + e.toString()); } value = field;
} -
response
: the value is calculated using the values of other fields, such asnotificationType
,delivery
,bounce
, etc. -
var javaImports = new JavaImporter(com.toolsverse.etl.common,
com.toolsverse.etl.connector, com.toolsverse.util.log); with (javaImports) { var type = {notificationType}; var ds = null; switch(type){ case "Delivery": ds = {delivery}; break; case "Bounce": ds = {bounce}; break; case "Complaint": ds = {complaint}; break; } var field = ""; try { field = ConnectorUtils.dataSet2Str(ds, " com.toolsverse.etl.connector.json.JsonConnector", null); } catch (e) { Logger.log(Logger.INFO, null, "delivery:" + e.toString()); } value = field;
} -
error
: the value is calculated using the values of other fields, such asnotificationType
,bounce
, etc. -
var javaImports = new JavaImporter(com.toolsverse.etl.common);
with (javaImports) { var type = {notificationType}; var ds = null; var field = ""; switch(type){ case "Bounce": ds = {bounce}; if (ds != null && ds.getRecordCount() > 0) { field = ds.getFieldValue(ds.getRecord(0), "bounceType") + ':' + ds.getFieldValue(ds.getRecord(0), "bounceSubType"); } break; case "Complaint": ds = {complaint}; if (ds != null && ds.getRecordCount() > 0) { field = ds.getFieldValue(ds.getRecord(0), "complaintFeedbackType"); } break; } value = field;
}
Step 9. For all three calculated fields, set the data type to VARCHAR
.
Step 10. Save the Flow and Schedule it as an event-driven Flow.
Comments
0 comments
Please sign in to leave a comment.