When to use this Flow type
If you already have CSV or Parquet files in Azure Storage and don't need to transform the data, the most efficient way to load these files into Synapse Analytics is to use the Flow type
Bulk load files in Azure Storage into Synapse Analytics described in this article.
Use this flow type
- When you simply want to load CSV or Parquet files that already exist in Azure Storage into Synapse Analytics, without applying any transformations or transferring files over the network.
- When you want to load CSV files created by the CDC Flow.
Flows optimized for Synapse Analytics
|Flow type||When to use|
|When you need to extract data from any source, transform it and load it into Synapse Analytics.|
|Bulk load files in Azure Blob into Synapse Analytics||You are here||When you need to bulk-load files that already exist in Azure Blob without applying any transformations. The flow automatically generates the COPY command and MERGEs data into the destination.|
|Stream CDC events into Synapse Analytics||When you need to stream updates from the database which supports Change Data Capture (CDC) into Synapse Analytics in real-time.|
How it works
- The Flow reads the names of all files matching the wildcard in the specific Azure Storage blob. It can traverse the subfolders as well.
- The Flow calculates the destination table names based on the source file names.
- The Flow creates multiple threads, one per destination table but not more than the configurable threshold.
- The Flow generates and executes the
COPY INTOcommand to load files into the temporary table –– one temporary table per actual destination table.
- The Flow mergers data in the temporary tables with data in the actual tables.
Alternatively, the Flow can load files directly into the actual destination table by generating and executing the
COPY INTO command for files matching the wildcard.
The Flow supports
COPY INTO (INSERT),
MERGE (UPSERT), and
CDC MERGE. With
MERGE, the Flow updates the actual destination table by applying
DELETE events in the same order as they were originated in the source database. It performs it in the most efficient way possible.
|Monitor source schema changes||The Flow is able to
|Load CSV and Parquet files||The flow can load CSV and Parquet files in Azure Storage into Synapse Analytics|
|Delete loaded source files||The Flow can be configured to delete successfully loaded source files.|
|Load data in parallel||The Flow is able to load data into multiple destination tables in parallel.|
|Process files in subfolders||The Flow is able to process files in multiple nested subfolders.|
- The Azure Synapse Analytics dedicated SQL pool is up and running.
- The Azure Storage blob is created.
- Files to load exist in the blob.
Step 1. Create a new Azure Storage Connection
This Connection will be used as a source to access files stored in the Azure Storage blob. Read more about Azure Storage connector.
Step 2. Create a new Azure Synapse Analytics Connection
Azure Synapse Analytics Connection will be used as a destination.
Step 3. Create a new CSV Format or Parquet Format
This Format will be used to generate the valid
COPY INTO command.
Step 4. Create a new Flow
This Flow will be used to load files in Azure Storage into Azure Synapse Analytics.
[+], type in
bulk load files in azure storage, and select the Flow
Bulk load files in Azure Storage into Synapse Analytics.
Step 5. Configure load transfromation
Select or enter the following attributes of the transformation (left to right):
- Azure Storage Connection created in step 1.
- CSV or Parquet Format created in step 3.
- A wildcard filename that matches the filenames in the cloud storage. Use
.csvextension for uncompressed files and
- Synapse Analytics Connection created in step 2.
- The wildcard destination table name in the following format:
database.schameis a Synapse Analytics database and schema to load data into. Example:
test.dbo.*. You don't need to use wildcards when loading into a specific table.
Step 6. Set optional and required parameters
Select or enter the following optional and required parameters:
Source Files and Destination Tables
Include files in subfolders: if this option is enabled, the Flow will load all files that match the wildcard filename (set in
FROM) in all subfolders under the main bucket or blob.
Exclude and Exclude files: the optional comma-separated list of files to exclude and/or include. You can use wildcard file names. These options used together with the wildcard filename set in
- The Flow populates the list of files matching a wildcard filename in
- The Flow excludes files based on the list of exclusions set in
- The Flow includes files based on the list of inclusion set in
- The Flow populates the list of files matching a wildcard filename in
nameand the actual table name must be assigned to the variable
value. For example, let's assume that the source file name is
value = name.substring(name.indexOf('-') + 1, name.indexOf('_cdc_stream'));. IMPORTANT: the flow will automatically recognize and transform filenames matching the following template:
*_cdc_stream_*.*. If filenames are matching this template there is no need to provide a transformation script.
Maximum Number of Files to Process: the optional parameter that controls how many files will be loaded in one batch (one run of the Flow). If you expect that at any point in time there will be hundreds of thousands or millions (10^6 - 10^7) of files in the source folder(s), it is recommended to limit the number of files to process to more manageable: hundreds to thousands (10^3-10^5).
Maximum Number of Parallel Loads: if the value of this optional parameter is greater than 1, the Flow will create multiple threads to load data in parallel –– one thread per table but not more than the threshold set in this parameter.
Parallel: enable this flag if you have more than one source-to-destination transformation that you want to execute in parallel.
Purge File if Success: if this option is enabled (by default), the source files will be automatically deleted after the
COPY INTOcommand is successfully executed.
Log each executed SQL statement: enable this flag if you want to log each automatically generated and executed SQL statement, including before/after Load SQL.
Continue loading data into other tables if Error: if this parameter is enabled (it is disabled by default) and there is an error when loading data into some table the flow will continue loading data into other tables. If configured the notification will be sent to the webhook.
Direct load: if this option is enabled (it is disabled by default) the flow loads data into the Synapse Analytics table directly without an intermediate temporary table. Enabling
Direct loadis the fastest way to load data into Synapse Analytics but it does not support MERGE and CDC MERGE.
Use COPY INTO with a wildcard pattern: If this option is enabled (it is disabled by default) the flow loads all files into a specific destination table in one call using COPY INTO with a wildcard pattern. The advantage of this approach is the speed of load. The disadvantage is an inability to handle files with a different number of columns.
Action: can be
COPY INTOwhich inserts records from the file(s) into the Synapse Analytics table,
MERGEwhich merges records from the file with the records in the Synapse Analytics table, and
CDC MERGEwhich loads CDC events (
DELETE) in the same order as they originated in the source database.
CDC MERGErequire configuring the
Lookup Fieldsor/and enabling the .
How to MERGE: determines how to perform MERGE into the Synapse Analytics table. The default is DELETE/INSERT: DELETE all records in the actual table that also exist in the temp table, then INSERT all records from the temp table into the actual table. If this parameter is set to Native MERGE the flow will execute native Synapse Analytics MERGE SQL. Note that native MERGE requires the table to be created with DISTRIBUTION HASH.
Create tables with DISTRIBUTION = HASH: if this option is enabled the actual table is created as CREATE TABLE (COLUMNS) (WITH DISTRIBUTION = HASH(PK)). A hash-distributed table has a distribution column that is the hash key. IMPORTANT: Native MERGE requires enabling this option.
Handle explicit CDC updates: The typical stream produced by CDC flow includes all columns in the monitored table. In some cases, CDC flow captures changes in the explicitly changed columns only, for example, when the supplemental logging for the Oracle table is configured as 'LOG DATA (PRIMARY KEY) COLUMNS' or 'LOG DATA (UNIQUE) COLUMNS'. Enable this option so the CDC MERGE statement can preserve the original values of the columns NOT captured by CDC flow.
Lookup Fields: the comma-separated list of fields that uniquely identify the record in the target Synapse Analytics table.
- If this option is enabled and
Lookup Fieldsis empty, the system will try to predict the columns that uniquely identify the record in the target Synapse Analytics table.
COPY INTO Parameters
Line Separator: the default is
/non Windows and
0x0aon Linux. You can override it using this property. It is useful if CSV files are generated in one operating system but will be loaded in another.
Max # of Errors to Ignore: specifies the maximum number of reject rows allowed in the load before the
COPYoperation is canceled. Each row that cannot be imported by the
COPYoperation is ignored and counted as one error. If
max_errorsare not specified, the default is
Error File Directory: specifies the directory relative to the container within the
COPYstatement where the rejected rows and the corresponding error file should be written. If the specified path doesn't exist, one will be created on your behalf. A child directory is created with the name
Identity Insert: specifies whether the identity value or values in the imported data file are to be used for the identity column. If
OFF(default), the identity values for this column are verified but not imported. Azure Synapse Analytics will automatically assign unique values based on the seed and increment values specified during table creation. Note the following behavior with the
OFF, and table has an identity column; a column list must be specified which does not map an input field to the identity column. If
ON, and table has an identity column, if a column list is passed, it must map an input field to the identity column. The default value is not supported for the
IDENTITY COLUMNin the column list.
IDENTITY_INSERTcan only be set for one table at a time.
Date Format: possible values:
dym. Specifies the date Format of the date mapping to SQL Server date Formats.
COPY INTO parameters
Truncate Columns: if enabled (disabled by default), the
COPY INTOcommand will be generated with a
TRUNCATECOLUMNS =true option. Strings are automatically truncated to the target column length.
On Error: what to do in case of any error. Available options are:
Change wildcard pattern for COPY INTO
By default, the flow sets the wildcard pattern to the longest common string for the files that will be loaded into a specific table. When
etlConfig: the config.
table: the table name.
commonFileName: the default wildcard pattern.
files: the list of files.
value = table + '_cdc_stream_';
Before COPY SQL: this SQL will be executed before running
COPY INTO SQL.
Ignore errors when executing Before COPY SQL: if this option is enabled, and there is an error when
Before COPY SQLis executed, the error will be ignored.
Before COPY SQL is a script: if this option is enabled,
Before COPY SQLwill be executed as a TSQL Script.
After COPY SQL: this SQL will be executed after running
COPY INTO SQL.
Ignore errors when executing After COPY SQL: if this option is enabled and there is an error when
After COPY SQLis executed, the error will be ignored.
After COPY SQL is a script: if this option is enabled,
After COPY SQLwill be executed as a TSQL script.
Handling source schema changes
Alter target table if the source has columns that the target table doesn't have: if this option is enabled (it is disabled by default), and the source has different fields than the target table, the system will add extra fields to the target table.
Create a new Synapse connection for each table
Enable this option if you want to execute ETL scrips as a part of the load. Synapse Analytics allows only one stored procedure or TSQL code fragment to be executed from a single connection at any given time. So when the same connection is shared by all loaders (each loading into its own Synapse table) AND the ELT scripts are used the queries are getting queued which creates a performance bottleneck.
To enable select
Create new destination connection for each table:
Step 7. Optionally, add a mapping.
The mapping can be used to:
- Rename the column.
- Exclude the column.
- Change the column data type for tables automatically created by the flow.
Select transformation and click
Add mapping for column
The renaming or excluding the column in mapping will work for all tables that have this column. If the table does not have the column the mapping will be ignored.
Step 8. Optionally, add more source-to-destination transformations
[+] and repeat steps 5 to 6 for more locations and/or file name templates.