Overview
An eLearning company needs to load data from 1600+ MySQL databases into the Snowflake data warehouse.
Requirements
- Setup data pipeline to load incremental updates from 1600+ MySQL databases into the Snowflake data warehouse.
- The data pipeline must support
INSERTS
,UPDATES
, andDELETES
. - The data pipeline must be able to automatically pick up new databases and adjust the destination schema if new columns are added to the source tables.
- The expected volume of data: hundreds of gigabytes, billions of records on initial load, tens of millions updates every day.
- The expected number of tables across all MySQL databases: 55000.
- The number of tables in Snowflake: 35.
- The data pipeline must be extremely resilient to the extract and load errors.
- The data pipeline is expected to work in a fully automated mode.
Solution
Set up MySQL read replica instance in RDS
It is recommended to stream CDC events from the read replica, so our customer set up the MySQL instance in RDS and configured a native MySQL replication from the production instance to the replica.
Creating a read replica is an optional step, but we highly recommend it as it will significantly lessen a load of replication on your MySQL production instance.
Set up Permissions
The following permissions need to be configured for the MySQL user in the read replica MySQL instance.
Permission/item | Description |
---|---|
SELECT |
Enables the connector to select rows from tables in databases. This is only used when performing a snapshot. |
RELOAD |
Enables the connector the use of the This is only used when performing a snapshot. |
SHOW DATABASES |
Enables the connector to see database names by issuing the This is only used when performing a snapshot. |
REPLICATION SLAVE |
Enables the connector to connect to and read the MySQL server binlog. |
REPLICATION CLIENT |
Enables the connector the use of the following statements:
This is always required for the connector. |
LOCK_TABLES |
Amazon RDS or Amazon Aurora that do not allow a global read lock, table-level locks are used to create a consistent snapshot. |
Enable binlog replication for MySQL read replica instance
Step 1. Create a new RDS parameter group.
Step 2. Set the following parameters as below:
binlog_format: ROW
log_bin_use_v1_row_events: 1
net_read_timeout: 3600
net_write_timeout: 3600
wait_timeout: 86400
Step 3. Assign this group of parameters to the MySQL read replica instance.
Choose the right approach
There are 2 options for extracting data using CDC and loading it into the Snowflake:
- Create a separate Flow to extract data from the database, create CSV files in the local storage, and another Flow to load CSV files into the Snowflake.
- Create a separate Flow to extract data from the database and ingest it into the messaging queue, such as Kafka. Create another Flow to load data from the queue into the Snowflake.
For this project, we decided to use option 1. Option 2 requires a complicated setup with a separate message queue such as Kafka or Azure Event Hubs.
Read about the cons and pros of each approach.
Set up Flow to extract data from MySQL using CDC (change data capture)
Design considerations for extract
A typical CDC Flow can extract data from multiple tables in multiple databases, but having a single Flow pulling data from 55000+ tables would be a major bottleneck as it would be limited to a single blocking queue with a limited capacity. It would also create a single point of failure.
A better approach would be to create multiple parallel extract Flows, each pulling data from all 35 Snowflake tables in a single database. However, considering that the Flow must extract data from 1600 MySQL databases, it would be impractical and very resource-demanding – we would have to run 1600+ parallel extract Flows.
It was decided to create 35 extract Flows, each pulling data from a single table in all 1600 databases.
There are other possible topologies to consider as well, for example:
- Group tables and databases alphabetically.
- Create separate extract Flows for the large and high-traffic tables and for the rest.
- Split Flows in chunks, each extracting data from a significant number of tables across multiple databases (for example, 1000).
We recommend selecting the topology that works the best for you, keeping the performance and maintenance overhead in mind.
Considering the design choice –– 35 Flows, each pulling data from 35 tables in all 1500 databases, it was decided to create a table in the Snowflake, which has a list of databases to extract data from. This table is used to automatically populate the list of the included tables in a Format: database1.table_abc,database2.table_abc,...database1500.table_abc
and included databases in a Format database1,database2,...database1500
.
The basic idea is that we can use a single MySQL CDC Connection where the Included Databases(s)
and Included Table(s)
are set as {tokens}
, populated at run-time by JavaScript.
Step-by-step tutorial for creating the extract Flow
Step 1. Create MySQL CDC Connection. When setting the parameters for the Connection, use {tokens}
for Name
, Included Databases(s)
, Included Table(s)
, Server
, Offset File Name
, and DDL History File Name
.
Step 2. Create Extract Flow using Flow type Stream CDC events, create files
.
Step 3. Add a single transformation where the source Connection is a MySQL CDC Connection created in step 1 and the FROM
is *
.
Step 4. Create a Snowflake Connection.
Steps 5 to 10 are optional and only needed because of the requirement to pull data from 1500 databases. In most cases, all you need is a single extract Flow, created in step 2.
Step 5. Create a table configuration (database_name text)
in Snowflake and add databases to extract data from that table.
Step 6. Create JavaScript Flow and add the code below. This Flow sets global variables whitelisted_databases
and whitelisted_tables
that are used as {tokens}
in the MySQL CDC Connection. It also sets the stop flag if there are no databases configured.
var javaImports = new JavaImporter(com.toolsverse.etl.core.engine,
com.toolsverse.config)); with (javaImports) {
var table = scenario.getVariable('TABLE_NAME') != null ?
scenario.getVariable('TABLE_NAME').getValue() : "";
var stop = false;
if (Utils.isNothing(table)) {
etlConfig.log("No tables configured");
stop = true;
} else {
var databases = Extractor.lookup(etlConfig, scenario, "Snowflake",
"databases", "select database_name from configuration");
var props = SystemConfig.instance().getProperties();
if (databases == null || databases.isEmpty()) {
etlConfig.log("No databases configured");
stop = true;
} else {
var whitelistedDatabases = "";
var whitelistedTables = "";
for (var row = 0; row < databases.getRecordCount(); row++) {
var dbName = databases.getFieldValue(databases.getRecord(row),
"database_name");
whitelistedDatabases = whitelistedDatabases + (row == 0 ? "" : ",")
+ dbName;
whitelistedTables = whitelistedTables +
(row == 0 ? "" : ",") + dbName + "." + table;
}
props.put("table_name", table);
props.put("whitelisted_databases", whitelistedDatabases);
etlConfig.log("Whitelisted databases: " + whitelistedDatabases);
props.put("whitelisted_tables", whitelistedTables);
etlConfig.log("Whitelisted tables: " + whitelistedTables);
}
}
if (stop) {
etlConfig.setValue("stop", "true");
}
}
Step 7. Create a nested Flow. Add a JavaScript Flow created in step 6.
Step 8. Add Extract Flow created in step 2.
Step 9. Click the edit (pencil)
icon in front of Flow 2 and modify the condition as below. Flow 2 will not be executed if the flag stop is set.
value = Utils.isNothing(etlConfig.getValue("stop"));
Step 10. Select the Parameters
tab and add a Flow variable TABLE_NAME
. Do not set the value yet.
Step 11. Duplicate the main nested extract Flow 35 times and set the actual value of the TABLE_NAME
variable for each Flow. In the end, there should be 35 extract Flows –– one Flow per table.
Set up Flows to load data in Snowflake
Design considerations for load
This tutorial explains how to set up a Flow to load CSV files created by the extract Flow into the Snowflake.
Once again, there are multiple options to consider:
Option 1. The tutorial above suggests using the CDC MERGE
action which applies INSERTS
/ UPDATES
/ DELETES
in order in which CDC events were originated in the source database.
For each CSV file, the Flow does the following:
- Creates a temporary table in the Snowflake.
- Executes
COPY INTO
command to load the file as-is into the temp table. - Uses Snowflake
MERGE
command to merge data in the temp table with the data in the actual table.
This approach guarantees the smallest latency but is more resource-consuming and requires more Snowflake credits (which can be more expensive from the Snowflake standpoint).
Option 2. Skip DELETE
events and only apply INSERTS
and UPDATES
. It can be a bit faster compared to option 1.
Option 3. Always INSERT
data into the staging tables in Snowflake, then periodically execute an SQL script to populate the actual tables by de-duplicating the staging tables and removing d
(delete) records.
This option is very popular when customers are OK with longer delays between data being updated in the source and finally available in the data warehouse for consumption by the BI tools.
After extensive testing, our customer decided to use option 3. The main reason was the fact that (almost) real-time approach (option 1) uses Snowflake cloud services and consumes extra Snowflake credits.
Schedule extract and load Flows
Schedule extract Flows
In Etlworks, it is possible to Schedule a Flow to run continuously until there is nothing to do, then stop for a configurable number of seconds and restart. We recommended this Schedule type for 35 extract Flows. The extract Flows are running until they are automatically stopped to let the system add new databases. The customer set the delay between restarts to 2 hours.
Schedule load Flow
The load Flow is loading files into Snowflake in batches. We recommended running it every few minutes so it could clear the queue as often as possible. The customer set it to run every 5 minutes.
Success metrics
On average, the pipelines load tens of millions of records into Snowflake daily, but there are days when the number of records jumps to hundreds of millions. As configured, the pipelines can easily handle extracting and loading billions of records a day.
Adding new databases is fully automated and handled using the Flow management API that automatically stops pipelines at midnight and adds new databases to the configuration table. When the scheduler restarts the pipelines, the new databases automatically get snapshotted before pipelines switch to the CDC mode.
Comments
0 comments
Please sign in to leave a comment.