Overview
Azure Synapse Analytics is built on top of Microsoft SQL Server. Therefore, you can use the same techniques you would normally use to work with relational databases in Etlworks Integrator. It is, however, important to understand that inserting data into Synapse Analytics row by row can be painfully slow.
It is recommended that you use flows optimized to ingest data into Synapse Analytics.
Using Synapse Analytics-optimized flows you can extract data from any of the supported sources and load it directly into Synapse Analytics.
A typical flow optimized for Synapse Analytics performs the following operations:
- Extracts data from the source.
- Creates CSV files.
- Optionally compresses files using the gzip algorithm.
- Copies files into Azure storage container.
- Checks to see if the destination table exists, and if it does not - creates the table using metadata from the source.
- Dynamically generates and executes the COPY INTO command which bulk-inserts CSV files in the Azure Storage into the Synapse Analytics database.
- Cleans up the remaining files, if needed.
Prerequisites
- The Synapse Analytics instance must be available from the Internet.
- The user executing the COPY INTO Command must have the following permissions:
- The Azure Storage account exists and the user has read/write permissions for the specific container.
Step-by-step instruction
Step 1. Create all required connections.
You will need a source connection, an Azure Storage connection used as a stage for the file to load, and an Azure Synapse Analytics connection.
When configuring a connection for Azure Storage, which will be used as a stage for the flows, it is recommended that you select GZip
as the value for the Archive file before copying field.
Step 2. Create a data exchange format
Synapse Analytics can load CSV and Parquet files but Etlworks only supports loading from CSV, so you will need to create a CSV format.
Step 3. Create a flow to load data in Synapse Analytics
Start creating a flow by opening the Flows window, clicking the +
button, and typing synapse
into the search field:
Continue by selecting the flow type, adding source-to-destination transformations, and entering the transformation parameters:
Step 4. Set Synapse Analytics connection
To configure the final destination, click the Connections tab, and select the available Synapse Analytics connection.
Step 5. Set the source and the destination
Depending upon the flow type, you can select one of the following sources (FROM) for the Synapse Analytics flow:
- Database - use the table name as the source (FROM) name
- API - use any appropriate string as the source (FROM) name
- Web Service - use any appropriate string as the source (FROM) name
- File - use the source file name or a wildcard filename as the source (FROM) name
- Queue - use the queue topic name as the source (FROM) name
For all Synapse Analytics flows, the destination connection is going to be an Azure storage connection. Integrator uses the destination connection as a stage.
Select or enter the fully-qualified or wildcard Synapse Analytycs table name as a destination (TO).
Step 6. Set Data Exchange Format configured in step 2
Step 7. Set the optional parameters
Click the MAPPING button in the transformation row and select the Parameters tab.
Optional parameters and transformations
- Column List - you can specify a comma-separated list of column names to load source data fields into specific target columns. The columns can be in any order in the COPY INTO statement, but when loading from flat files, such as in an Azure Storage container, their order must match the order of the source data.
- Line Separator - the default is '/n' on Windows and '0x0a' on Linux. You can override it using this property. It is usefull if CSV files are generated in one operating system but will be loaded in another
- Max # of Errors to Ignore - specifies the maximum number of reject rows allowed in the load before the COPY operation is canceled. Each row that cannot be imported by the COPY operation is ignored and counted as one error. If max_errors is not specified, the default is 0.
- Error File Directory - specifies the directory relative to the container within the COPY statement where the rejected rows and the corresponding error file should be written. If the specified path doesn't exist, one will be created on your behalf. A child directory is created with the name 'rejectedrows'.
- Identity Insert - specifies whether the identity value or values in the imported data file are to be used for the identity column. If IDENTITY_INSERT is OFF (default), the identity values for this column are verified, but not imported. Azure Synapse Analytics will automatically assign unique values based on the seed and increment values specified during table creation. Note the following behavior with the COPY command: If IDENTITY_INSERT is OFF, and table has an identity column - a column list must be specified which does not map an input field to the identity column. If IDENTITY_INSERT is ON, and table has an identity column - if a column list is passed, it must map an input field to the identity column. Default value is not supported for the IDENTITY COLUMN in the column list. IDENTITY_INSERT can only be set for one table at a time.
- Date Format - possible values: mdy | dmy | ymd | ydm | myd | dym. Specifies the date format of the date mapping to SQL Server date formats.
- Action - can be either
COPY INTO
- inserts records from the file(s) into the Synapse Analytics table orMERGE
- merges records from the source with the records in the Synapse Analytics table orCDC MERGE
- merges records from the CDC-enabled source with the records in the Synapse Analytics table. MERGE and CDC MERGE require configuring Lookup Fields. - Lookup Fields - the comma-separated list of fields that uniquely identify the record in the Synapse Analytics table. This field is required if action is set to MERGE or CDC MERGE.
- COPY INTO SQL - this is a user-defined COPY SQL. By default, Integrator creates COPY SQL automatically, based on the input and output parameters. You can override it by using this field. Read more about the Synapse Analytics COPY INTO command.
- MERGE SQL - this is a user-defined SQL that will be used instead of the default when action is set to
MERGE
. If nothing is entered in this field the default MERGE SQL will be used. The following parameters are automatically populated and can be referenced as {TOKEN} in the SQL:- {TABLE} - the table to MERGE data into,
- {TEMP_TABLE} - the table to merge data from,
- {KEY_FIELDS} - the fields uniquely identifying the record in both tables,
- {FIELDS} - the fields to INSERT/UPDATE in the table to MERGE data into.
- CDC MERGE SQL - this is a user-defined SQL that will be used instead of the default when action is set to
CDC MERGE
. If nothing is entered in this field the default CDC MERGE SQL will be used. The following parameters are automatically populated and can be referenced as {TOKEN} in the SQL:- {TABLE} - the table to MERGE data into,
- {TEMP_TABLE} - the table to merge data from,
- {KEY_FIELDS} - the fields uniquely identifying the record in both tables,
- {FIELDS} - the fields to INSERT,
- {UPDATE_FIELDS} - the fields and values to UPDATE in the format field=value,field=value, {UPDATE_CONDITIONS} - the WHERE clause to update existing records
- Purge File if Success - if this option is enabled (by default), and the user-defined SQL is not used, Integrator will run the COPY INTO command with the purge option enabled. With purge enabled, the staging file will be automatically deleted after the COPY INTO command is successfully executed.
- Purge File(s) if Error - if this option is enabled (by default) the staging files will be automatically deleted if a COPY INTO command was executed with an error.
- Before COPY SQL - this SQL will be executed on the Synapse Analytics connection before running COPY SQL.
- Ignore errors when executing Before COPY SQL - if this option is enabled, and there is an error when "Before COPY SQL" is executed - the error will be ignored.
- After COPY SQL - this SQL will be executed on the Synapse Analytics connection after running COPY SQL.
- Ignore errors when executing After COPY SQL - if this option is enabled, and there is an error when "After COPY SQL" is executed - the error will be ignored.
Parameters specific for different source types
Depending on the flow type, other flow parameters can be added, as explained below:
- The source is a database
- The source is a file or web service
- - if this parameter is enabled the system will copy files directly into the Synapse Analytics stage before executing the COPY INTO command. This greatly improves the performance of the load but automatically disables any transformations.
- - if this parameter is enabled the system will automatically delete loaded files from the source.
- - if this parameter is enabled and there is an error during the load the system will automatically delete loaded files from the source.
- The source is a well-known API
Step 8. Optionally configure MERGE (UPSERT)
To merge (upsert) existing data in the Synapse Analytics table with new data:
- Set the Action to MERGE.
- Define the Lookup Fields - the comma-separated list of fields that uniquely identify the record in the Synapse Analytics table.
Alternatively, you can enable the Predict Lookup Fields which, if enabled, will force the flow to use various algorithms to automatically predict the fields that uniquely identify the record. Note that it is not always possible to correctly detect the unique fields.
When enabling the Predict Lookup Fields (which is not always accurate) is not an option you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
Step 9. Configure how to handle source and destination schema changes
It is quite typical when the source (for example, the table in the OLTP database) and the destination (Synapse Analytics table) have a different number of columns. It is also entirely possible that the order of columns in the source is different than the order of columns in the destination. In either of these cases, by default, the flow will fail since the Synapse Analytics COPY INTO command cannot load files that have more or fewer columns than the target table or order of columns is different.
When setting the source-to-destination transformation, it is possible to configure it to automatically handle schema changes. The following configuration options are available:
- Reorder columns to load to match order of columns in the target table. Also update column types to match the target - if this option is enabled (it is disabled by default) the system will reorder columns in the data file to load to match the order of columns in the target Synapse Analytics table. When this option is enabled the system also updates the column data types to match the existing columns in the target table. Enable it to fix "Column type 'VALUE' is not recognized". This option is ignored if the target table does not exist.
- Ignore fields in the source that do not exist in the target table - if this option is enabled (it is disabled by default) and the source has more fields than the target (Synapse Analytics) table, the system will automatically exclude extra fields. Options 2, 3 and 4 are mutually exclusive.
- Alter target table if the source has columns that the target table doesn't have - if this option is enabled (it is disabled by default) and the source has different fields than the target (Synapse Analytics) table, the system will add extra fields to the target table. Options 2, 3 and 4 are mutually exclusive.
- Recreate target table if the source has columns that the target table doesn't have - if this option is enabled (it is disabled by default) and the source has different fields than the target (Synapse Analytics) table, the system will automatically drop and recreate the target table. Options 2, 3 and 4 are mutually exclusive.
- Insert null into the fields in the target table that do not exist in the source - if this option is enabled (it is disabled by default) and the target (Synapse Analytics) table has different fields than the source, the system will automatically insert NULL values into these fields in the target.
Configuring notifications if source and destination have different columns
It is possible to send an email notification if the source and destination have different columns. To configure flow to send a notification when either source has more columns than the destination or the destination has more columns than the source, use the technique explained in this article.
Step 10. Optionally configure Mapping
If necessary, you can create a mapping between the source and destination (Synapse Analytics) fields.
You create the mapping between the source and destination just like you usually do for any other flow type.
Setup Change Replication using high watermark
As in any other flow type, it is possible to configure a change replication using a high watermark.
When change replication is enabled, only the changed records will be loaded into Synapse Analytics.
Basically, all you need to do is set the high watermark field and enable change replication for the transformation. Additionally, for better reliability, you can set the calculated high watermark field value.
Setup change replication using change data capture (CDC)
Change data capture (CDC) is an approach to data integration that is based on the identification, capture, and delivery of the changes made to the source database and stored in the database redo log (also called transaction log).
Etlworks supports replicating data using CDC from the MySQL, SQL Server, Oracle, Postgres, and MongoDB.
Read about configuring CDC for the source databases:
Once the CDC is configured for the source database you can create a CDC pipeline where the source is one of these databases and the destination is Synapse Analytics.
When creating a point-to-point CDC pipeline for extracting data from a CDC-enabled database and loading it into Synapse Analytics you have 2 options.
- Stream CDC events, create CSV files + load data from CSV files into Synapse Analytics.
- Stream CDC events into a message queue + load data from a message queue into Synapse Analytics.
Loading multiple tables by a wildcard name
To load multiple database objects (tables and views) by a wildcard name (without creating individual source-to-destination transformations) follow the same steps as above, except:
FROM
Create a single source-to-destination transformation and set the FROM to a fully qualified source database object name with wildcard characters (* and ?), for example public.*
By default, all tables and all views that match the wildcard name in FROM will be included. If you want to exclude specific database objects enter a comma-separated list of the database objects to exclude in the Exclude objects field. If you want to include only specific database objects enter a comma-separated list of the database objects to include in the Include objects.
TO
Set the TO to the db.schema.*
, for example dbo.history.*
.
Read how to programmatically change the destination (TO) name.
EXCLUDE
Optionally configure the list of the database object to exclude.
To exclude all views enter all views
in the Exclude objects field.
To exclude all tables enter all tables
in the Exclude objects field.
INCLUDE
Optionally configure the list of the database object to include.
SOURCE QUERY
Optionally configure the Source query. You can use token {TABLE}
in the source query.
BEFORE/AFTER SQL
If you want to execute any code before and/or after COPY use token {TABLE}
in the Before COPY SQL and After COPY SQL
MERGE
If you are configuring MERGE action do not enter anything in the Lookup Field. Enable Predict Lookup Field instead.
Alternatively to enabling the Predict Lookup Fields option (which is not always accurate) you can specify the list of table=fields pairs in the Lookup Field. Use the fully-qualified table names and ';' as a separator between table=field pairs.
Example:
test1.inventory=inventory_id,database_name;
test1.payment=payment_id,database_name;
test1.rental=rental_id,database_name;
HWM CHANGE REPLICATION
If you are setting up the high watermark change replication with a calculated HWM field use token {TABLE}
in the High Watermark Field Value.
Comments
0 comments
Please sign in to leave a comment.