Problem
eLearning company needs to load data from 1600+ MySQL databases into the Snowflake data warehouse.
Requirements
- Setup data pipeline to load incremental updates from 1600+ MySQL databases into the Snowflake data warehouse.
- The data pipeline must support INSERTs, UPDATEs, and DELETEs.
- The data pipeline must be able to automatically pick up new databases and adjust the destination schema if new columns are added to the source tables.
- The expected volume of data: hundreds of gigabytes, billions of records on initial load, tens of millions updates every day.
- The expected number of tables across all MySQL databases: 55000.
- The number of tables in Snowflake: 35
- The data pipeline must be extremely resilient to the extract and load errors.
- The data pipeline is expected to work in a fully automated mode.
Solution
Setting up MySQL read replica instance in RDS
It is recommended to stream CDC events from the read replica so our customer setup the MySQL instance in RDS and configured native MySQL replication from the production instance to the replica.
Creating a read replica is an optional step, but we highly recommend it as it will significantly lessen a load of replication on your MySQL production instance.
Setting up Permissions
The following permissions need to be configured for the MySQL user in the read replica MySQL instance.
Permission/item | Description | ||
---|---|---|---|
|
enables the connector to select rows from tables in databases
|
||
|
enables the connector the use of the
|
||
|
enables the connector to see database names by issuing the
|
||
|
enables the connector to connect to and read the MySQL server binlog. |
||
|
enables the connector the use of following statements:
|
||
|
Amazon RDS or Amazon Aurora that do not allow a global read lock, table-level locks are used to create a consistent snapshot |
Enabling binlog replication for MySQL read replica instance
Step 1. Create a new RDS parameter group
Step 2. Set the following parameters as below
binlog_format: ROW
log_bin_use_v1_row_events: 1
net_read_timeout: 3600
net_write_timeout: 3600
wait_timeout: 86400
Step 3. Assign this parameters group to the MySQL read replica instance.
Choosing the right approach
There are 2 options for extracting data using CDC and loading it into the Snowflake:
- Create a separate flow to extract data from the database and create CSV files in the local storage and another flow to load CSV files into the Snowflake.
- Create a separate flow to extract data from the database and ingest it into the messaging queue, such as Kafka. Create another flow to load data from the queue into the Snowflake.
For this project, we decided to use option 1. Option 2 requires a complicated setup with a separate message queue such as Kafka or Azure Event Hubs.
Read about the cons and pros of each approach.
Setting up flows to extract data from MySQL using CDC (change data capture)
Design considerations for extract
Typical CDC flow can extract data from multiple tables in multiple databases but having a single flow pulling data from 55000+ tables would be a major bottleneck as it would be limited to a single blocking queue with a limited capacity. It would also create a single point of failure.
The better approach would be to create multiple parallel extract flows, each pulling data from all 35 Snowflake tables in a single database. However, considering that the flow must extract data from 1600 MySQL databases it would be impractical and very resource-demanding – we would have to run 1600+ parallel extract flows.
It was decided to create 35 extract flows, each pulling data from a single table in all 1600 databases.
There are other possible topologies to consider as well, for example:
- group tables and databases alphabetically
- create separate extract flows for the large and high-traffic tables and for the rest
- split flows in chunks, each extracting data from a significant number of tables across multiple databases (for example 1000)
We recommend selecting the topology that works the best for you, keeping in mind the performance and the maintenance overhead.
Considering the design choice - 35 flows, each pulling data from 35 tables in all 1500 databases, - it was decided to create a table in the Snowflake which has a list of databases to extract data from. This table is used to automatically populate the list of the included tables in a format: database1.table_abc,database2.table_abc,...database1500.table_abc
and included databases in a format database1,database2,...database1500
.
The basic idea is that we can use a single MySQL CDC connection where the Included Databases and Included Tables are set as {tokens}, populated at run-time by JavaScript.
Step-by-step tutorial for creating the extract flow
Step 1. Create MySQL CDC connection. When setting the parameters for the connection use {tokens} for Name, Included Databases, Included Tables, Server, Offset File Name, and DDL History File Name.
Step 2. Create Extract flow using flow type Stream CDC events, create files.
Step 3. Add a single transformation where the source connection is a MySQL CDC connection created in step 1 and the FROM is *
.
Step 4. Create a Snowflake connection.
Steps 5-10 are optional and only needed because of the requirement to pull data from 1500 databases. In most cases, all you need is a single extract flow, created in step 2.
Step 5. Create a table configuration (database_name text)
in Snowflake and add databases to extract data from to that table.
Step 6. Create JavaScript flow and add the code below. This flow sets global variables whitelisted_databases
and whitelisted_tables
that are used as {tokens} in the MySQL CDC connection. It also sets the "stop" flag if there are no databases configured.
importPackage(com.toolsverse.etl.core.engine);
importPackage(com.toolsverse.config);
var table = scenario.getVariable('TABLE_NAME') != null ?
scenario.getVariable('TABLE_NAME').getValue() : "";
var stop = false;
if (Utils.isNothing(table)) {
etlConfig.log("No tables configured");
stop = true;
} else {
var databases = Extractor.lookup(etlConfig, scenario, "Snowflake",
"databases", "select database_name from configuration");
var props = SystemConfig.instance().getProperties();
if (databases == null || databases.isEmpty()) {
etlConfig.log("No databases configured");
stop = true;
} else {
var whitelistedDatabases = "";
var whitelistedTables = "";
for (var row = 0; row < databases.getRecordCount(); row++) {
var dbName = databases.getFieldValue(databases.getRecord(row), "database_name");
whitelistedDatabases = whitelistedDatabases + (row == 0 ? "" : ",") + dbName;
whitelistedTables = whitelistedTables +
(row == 0 ? "" : ",") + dbName + "." + table;
}
props.put("table_name", table);
props.put("whitelisted_databases", whitelistedDatabases);
etlConfig.log("Whitelisted databases: " + whitelistedDatabases);
props.put("whitelisted_tables", whitelistedTables);
etlConfig.log("Whitelisted tables: " + whitelistedTables);
}
}
if (stop) {
etlConfig.setValue("stop", "true");
}
Step 7. Create a nested flow. Add a JavaScript flow created in step 6.
Step 8. Add Extract flow created in step 2.
Step 9. Click the edit (pencil) icon in front of flow 2 and modify the condition as below. Flow 2 will not be executed if the flag stop is set.
value = Utils.isNothing(etlConfig.getValue("stop"));
Step 10. Select the Parameters tab and add a Flow variable TABLE_NAME. Do not set the value yet.
Step 11. Duplicate the main nested extract flow 35 times and set the actual value of the TABLE_NAME variable for each flow. In the end, there should be 35 extract flows - one flow per table.
Setting up flows to load data in Snowflake
Design considerations for load
This tutorial explains how to set up a flow to load CSV files, created by the extract flow into the Snowflake.
Once again, there are multiple options to consider.
Option 1. The tutorial above suggests using the CDC MERGE action which applies INSERTs/UPDATEs/DELETEs in order in which CDC events were originated in the source database.
For each CSV file the flow does the following:
- Creates a temporary table in Snowflake.
- Executes COPY INTO command to load the file "as is" into the temp table.
- Uses Snowflake MERGE command to merge data in the temp table with the data in the actual table.
This approach guarantees the smallest latency but is more resource-consuming and requires more Snowflake credits (can be more expensive from the Snowflake standpoint).
Option 2. Skip Delete events and only apply INSERTs and UPDATEs. It can be a bit faster compared to option 1.
Option 3. Always INSERT data into the staging tables in Snowflake, then periodically execute a SQL script to populate the actual tables by de-duplicating the staging tables and removing 'd' (delete) records.
This option is very popular when customers are OK with longer delays between data being updated in the source and finally available in the data warehouse for consumption by the BI tools.
After extensive testing, our customer decided to use option 3. The main reason was the fact that [almost] real-time approach (option 1) uses Snowflake cloud services and consumes extra Snowflake credits.
Scheduling extract and load flows
Scheduling Extract Flows
In Etlworks, it is possible to schedule a flow to run continuously until there is nothing to do, then stop for a configurable number of seconds and restart. We recommended this schedule type for 35 extract flows. The extract flows are running until they are automatically stopped to let the system add new databases. The customer set the delay between restarts to 2 hours.
Scheduling Load Flow
The load flow is loading files into Snowflake in batches. We recommended running it every few minutes so it could clear the queue as often as possible. The customer set it to run every 5 minutes.
Success metrics
On average the pipelines load tens of millions of records into Snowflake daily, but there are days when the number of records jumps to hundreds of millions. As configured the pipelines can easily handle extracting and loading billions of records a day.
Adding new databases is fully automated and handled by using the flow management API that automatically stops pipelines at midnight and adds new databases to the configuration table. When the scheduler restarts the pipelines the new databases are getting automatically snapshotted before pipelines switch to the CDC mode.
Comments
0 comments
Please sign in to leave a comment.