Problem
eLearning company needs to load data from 1500+ MySQL databases into the Snowflake data warehouse.
Requirements
- Setup pipeline to load incremental updates from 1500+ MySQL databases into the Snowflake data warehouse.
- The pipeline must support INSERTs, UPDATEs, and DELETEs.
- The pipeline must be able to automatically adjust the destination schema if there are new tables in the source or new fields added to the source.
- The expected volume of data: terabytes, billions of records on initial load, hundreds of millions updates every day.
- The expected number of tables across all MySQL databases: 50000.
- The number of tables in Snowflake: 35
- The pipeline must be extremely resilient to the extract and load errors. It is expected to work in a fully automated mode.
- The pipeline will be running in real-time.
Solution
Setting up MySQL read replica instance in RDS
It is recommended to stream CDC events from the read replica so our customer setup the MySQL instance in RDS and configured native MySQL replication from the production instance to the replica.
Creating a read replica is an optional step, but we highly recommend it as it will significantly lessen a load of replication on your MySQL production instance.
Permissions
The following permissions need to be configured for the MySQL user in the read replica MySQL instance.
Permission/item | Description | ||
---|---|---|---|
|
enables the connector to select rows from tables in databases
|
||
|
enables the connector the use of the
|
||
|
enables the connector to see database names by issuing the
|
||
|
enables the connector to connect to and read the MySQL server binlog. |
||
|
enables the connector the use of following statements:
|
||
|
Amazon RDS or Amazon Aurora that do not allow a global read lock, table-level locks are used to create a consistent snapshot |
Enable binlog replication for MySQL read replica instance
Step 1. Create a new RDS parameter group
Step 2. Set the following parameters as below
binlog_format: ROW
log_bin_use_v1_row_events: 1
net_read_timeout: 3600
net_write_timeout: 3600
wait_timeout: 86400
Step 3. Assign this parameters group to the MySQL read replica instance.
Design considerations
There are 3 options for extracting data using CDC and loading into the Snowflake:
- Create a separate flow to extract data from the database and create CSV files in the local storage and another flow to load CSV files into the Snowflake.
- Create a separate flow to extract data from the database and ingest into the messaging queue, such as Kafka. Create another flow to load data from the queue into the Snowflake.
- Create a single point-to-point flow to extract and load data.
Option 2 requires a complicated setup with a separate message queue (Kafka or Azure Event Hubs). Option 3 is not practical for a parallel extract from 50000 tables and only recommended for quick testing.
Read about the cons and procs of each approach.
For this project it was decided to use option 1: create a separate flow to extract data from the database and create CSV files in the local storage and another flow to load CSV files into the Snowflake
Setting up flows to extract data from MySQL using CDC (change data capture)
Design considerations for extract
Typical CDC flow can extract data from multiple tables in multiple databases but having a single flow pulling data from 50000+ tables would be a major bottleneck as it would be limited to a single blocking queue with a limited capacity.
The better approach would be to create multiple parallel extracts, each pulling data from all 35 tables in a single database. However, considering that the flow must extract data from 1500 MySQL databases it would be impractical and very resource-demanding to run 1500 parallel extract flows.
So, it was decided to create 35 extract flows, each pulling data from a single table in all 1500 databases. There are other possible topologies to consider as well, for example:
- group tables and databases alphabetically
- create separate extract flows for the large and high-traffic tables and for the rest
- split flows in chunks, each extracting data from a significant number of tables across multiple databases (for example 1000)
We recommend selecting the topology that works the best for you, keeping in mind the performance and the maintenance overhead.
Considering the design choice - 35 flows, each pulling data from 35 tables in all 1500 databases, - it was decided to create a table in the Snowflake which has a list of databases to extract data from. This table is used to automatically populate the list of the included tables in a format: database1.table_abc,database2.table_abc,...database1500.table_abc
and included databases in a format database1,database2,...database1500
.
The basic idea is that we can use a single MySQL CDC connection where the Included Databases and Included Tables are set as {tokens}, populated at run-time by JavaScript.
Step-by-step tutorial for creating the extract flows
Step 1. Create MySQL CDC connection. When setting the parameters for the connection use {tokens} for Name, Included Databases, Included Tables, Server, Offset File Name, and DDL History File Name.
Other parameters:
- Enable Serialize CDC events as CSV files into internal storage
- CDC Key - the CDC key will be used as the name of the CSV file that contains the CDC events. For example, if CDC key is configured as
[db]_[table]_cdc_stream
the flow will create a file for each database.table with a suffix _cdc_stream and will stream the CDC events to this file. - Enable Treat 'null' as NULL
- Maximum Queue Size and Maximum Batch Size - for real production setup consider increasing both by at least x10.
- Enable Always generate INSERT - when this option is enabled the flow will add two extra columns to the stream:
- debezium_cdc_op VARCHAR - actual CRUD operation (c- create, u- update, d - delete)
- debezium_cdc_timestamp LONG - timestamp of the event in milliseconds
- Extra columns - DATABASE_NAME=db. It will add a source database name as a field to the CSV file
- Number of retries before giving up - 1000
- Retry N minutes before giving up - 100
The last two parameters control how long the extract flow should be kept running before automatically shutting down. The bigger the numbers are the longer it stays alive. Keeping the default values could cause the flow to prematurely shut down before it had a chance to extract all events from the transaction log.
Step 2. Create Extract flow using flow type Stream CDC events, create files.
Step 3. Add a single transformation where the source connection is a MySQL CDC connection created in step 1 and the FROM is *
.
Step 4. Create a Snowflake connection.
Steps 5-10 are optional and only needed because of the requirement to pull data from 1500 databases. In most cases, all you need is a single extract flow, created in step 2.
Step 5. Create a table configuration (database_name text)
in Snowflake and add databases to extract data from to that table.
Step 6. Create JavaScript flow and add the code below. This flow sets global variables whitelisted_databases
and whitelisted_tables
that are used as {tokens} in the MySQL CDC connection. It also sets the "stop" flag if there are no databases configured.
importPackage(com.toolsverse.etl.core.engine);
importPackage(com.toolsverse.config);
var table = scenario.getVariable('TABLE_NAME') != null ?
scenario.getVariable('TABLE_NAME').getValue() : "";
var stop = false;
if (Utils.isNothing(table)) {
etlConfig.log("No tables configured");
stop = true;
} else {
var databases = Extractor.lookup(etlConfig, scenario, "Snowflake",
"databases", "select database_name from configuration");
var props = SystemConfig.instance().getProperties();
if (databases == null || databases.isEmpty()) {
etlConfig.log("No databases configured");
stop = true;
} else {
var whitelistedDatabases = "";
var whitelistedTables = "";
for (var row = 0; row < databases.getRecordCount(); row++) {
var dbName = databases.getFieldValue(databases.getRecord(row), "database_name");
whitelistedDatabases = whitelistedDatabases + (row == 0 ? "" : ",") + dbName;
whitelistedTables = whitelistedTables +
(row == 0 ? "" : ",") + dbName + "." + table;
}
props.put("table_name", table);
props.put("whitelisted_databases", whitelistedDatabases);
etlConfig.log("Whitelisted databases: " + whitelistedDatabases);
props.put("whitelisted_tables", whitelistedTables);
etlConfig.log("Whitelisted tables: " + whitelistedTables);
}
}
if (stop) {
etlConfig.setValue("stop", "true");
}
Step 7. Create a nested flow. Add a JavaScript flow created in step 6.
Step 8. Add Extract flow created in step 2.
Step 9. Click the edit (pencil) icon in front of flow 2 and modify the condition as below. Flow 2 will not be executed if the flag stop is set.
value = Utils.isNothing(etlConfig.getValue("stop"));
Step 10. Select the Parameters tab and add a Flow variable TABLE_NAME. Do not set the value yet.
Step 11. Test the extract by executing the main nested flow and providing the actual value of the TABLE-NAME variable in the Additional Parameters:
Step 12. Duplicate the main nested extract flow 35 times and set the actual value of the TABLE_NAME variable for each flow. In the end, there should be 35 extract flows - one flow per table.
Setting up flows to load data in Snowflake
This tutorial explains how to set up a flow to load CSV files, created by the extract flow into the Snowflake.
The tutorial suggests using the CDC MERGE action which applies INSERTs/UPDATEs/DELETEs in order in which CDC events were emitted by the source database.
For each file the flow does the following:
- Creates a temporary table in Snowflake
- Executes COPY INTO command to load the file "as is" into the temp table
- Uses Snowflake MERGE command to merge data in the temp table with the data in the actual table
This approach guarantees the smallest latency but is more resource consuming and requires more Snowflake credits (can be more expensive from the Snowflake standpoint).
Other options:
2. Always INSERT data into the staging tables in Snowflake, then periodically execute a SQL script to populate the actual tables by de-duplicating the staging tables and removing 'd' (delete) records.
3. Skip Delete events and only apply INSERTs and UPDATEs. It can be a bit faster.
Option 2 (INSERT->de-duplicate) is very popular when customers are OK with longer delays between data being updated in the source and finally available in the data warehouse for consumption by the BI tools.
After extensive testing, our customer decided to use option 2. The main reason was the fact that [almost] real-time approach (option 1) uses Snowflake cloud services and consumes extra Snowflake credits. Basically, option 2 is cheaper.
Scheduling extract and load flows
Step 1. Schedule 35 extract flows. We recommend using Seconds with delay schedule type. Most of the extract flows will be running non-stop until there are no new INSERT/UPDATE/DELETE events for the included tables. You can set the delay to a few minutes or a few hours, depending on requirements. The customer set the delay to 3 hours.
Step 2. Schedule the load flow. The load flow will be loading files in batches. We recommend running it every few minutes so it could clear the files queue as often as possible. The customer set it to every 10 minutes.
Comments
0 comments
Please sign in to leave a comment.