When to use this Flow type
If you already have CSV or Parquet files in Azure Storage and don't need to transform the data, the most efficient way to load these files into Synapse Analytics is to use the Flow type Bulk load files in Azure Storage into Synapse Analytics
described in this article.
Use this flow type
- When you simply want to load CSV or Parquet files that already exist in Azure Storage into Synapse Analytics, without applying any transformations or transferring files over the network.
- When you want to load CSV files created by the CDC Flow.
Flows optimized for Synapse Analytics
Flow type | When to use | |
|
|
When you need to extract data from any source, transform it and load it into Synapse Analytics. |
Bulk load files in Azure Blob into Synapse Analytics | You are here | When you need to bulk-load files that already exist in Azure Blob without applying any transformations. The flow automatically generates the COPY command and MERGEs data into the destination. |
Stream CDC events into Synapse Analytics | When you need to stream updates from the database which supports Change Data Capture (CDC) into Synapse Analytics in real-time. | |
Stream messages from a queue into Synapse Analytics | When you need to stream messages from the message queue which supports streaming into Synapse Analytics in real time. |
How it works
- The Flow reads the names of all files matching the wildcard in the specific Azure Storage blob. It can traverse the subfolders as well.
- The Flow calculates the destination table names based on the source file names.
- The Flow creates multiple threads, one per destination table but not more than the configurable threshold.
- The Flow generates and executes the
COPY INTO
command to load files into the temporary table –– one temporary table per actual destination table. - The Flow mergers data in the temporary tables with data in the actual tables.
Alternatively, the Flow can load files directly into the actual destination table by generating and executing the COPY INTO
command for files matching the wildcard.
Features
The Flow supports COPY INTO
(INSERT), MERGE
(UPSERT), and CDC MERGE
. With CDC
MERGE
, the Flow updates the actual destination table by applying INSERT
/UPDATE
/DELETE
events in the same order as they were originated in the source database. It performs it in the most efficient way possible.
Other Features | Description |
Monitor source schema changes | The Flow is able to CREATE and ALTER destination tables to keep up with schema changes in the source. |
Load CSV and Parquet files | The flow can load CSV and Parquet files in Azure Storage into Synapse Analytics |
Delete loaded source files | The Flow can be configured to delete successfully loaded source files. |
Load data in parallel | The Flow is able to load data into multiple destination tables in parallel. |
Process files in subfolders | The Flow is able to process files in multiple nested subfolders. |
Prerequisites
- The Azure Synapse Analytics dedicated SQL pool is up and running.
- The Azure Storage blob is created.
- Files to load exist in the blob.
Process
Step 1. Create a new Azure Storage Connection
This Connection will be used as a source to access files stored in the Azure Storage blob. Read more about Azure Storage connector.
Step 2. Create a new Azure Synapse Analytics Connection
Azure Synapse Analytics Connection will be used as a destination.
Step 3. Create a new CSV Format or Parquet Format
This Format will be used to generate the valid COPY INTO
command.
Create CSV format to load CSV files and Parquet format to load Parquet files in Azure Storage.
When loading Parquet files the dates and timestamps must be serialized using the following Parquet schema:
"type": ["null", {"type": "long", "logicalType": "timestamp-millis"}]
Step 4. Create a new Flow
This Flow will be used to load files in Azure Storage into Azure Synapse Analytics.
In Flows
click [+]
, type in bulk load files in azure storage
, and select the Flow Bulk load files in Azure Storage into Synapse Analytics
.
Step 5. Configure load transformation
Select or enter the following attributes of the transformation (left to right):
- Azure Storage Connection created in step 1.
- CSV or Parquet Format created in step 3.
- A wildcard filename that matches the filenames in the cloud storage. Use
.csv
extension for uncompressed files and.gz
for compressed. - Synapse Analytics Connection created in step 2.
- The wildcard destination table name in the following format:
database.schema.prefix_*_suffix
, wheredatabase.schame
is a Synapse Analytics database and schema to load data into. Example:test.dbo.*
. You don't need to use wildcards when loading into a specific table.
Step 6. Set optional and required parameters
Click MAPPING
.
Select the Parameters
tab.
Select or enter the following optional and required parameters:
Source Files and Destination Tables
Include files in subfolders
: if this option is enabled, the Flow will load all files that match the wildcard filename (set inFROM
) in all subfolders under the main bucket or blob.Exclude and Exclude files
: the optional comma-separated list of files to exclude and/or include. You can use wildcard file names. These options used together with the wildcard filename set inFROM
:- The Flow populates the list of files matching a wildcard filename in
FROM
. - The Flow excludes files based on the list of exclusions set in
Exclude files
. - The Flow includes files based on the list of inclusion set in
Include files
.
- The Flow populates the list of files matching a wildcard filename in
Calculate Synapse Analytics table name
: This is an optional parameter used to calculate (using JavaScript) the destination table name based on the source file name. The original file name, without the extension, is stored in the variablename
and the actual table name must be assigned to the variablevalue
. For example, let's assume that the source file name isdb_schema-table_cdc_stream_uuid.csv
. The JavaScript code to calculate the destination table name will bevalue = name.substring(name.indexOf('-') + 1, name.indexOf('_cdc_stream'));
. IMPORTANT: the flow will automatically recognize and transform filenames matching the following template:*_cdc_stream_*.*
. If filenames are matching this template there is no need to provide a transformation script.Maximum Number of Files to Process
: the optional parameter that controls how many files will be loaded in one batch (one run of the Flow). If you expect that at any point in time there will be hundreds of thousands or millions (10^6 - 10^7) of files in the source folder(s), it is recommended to limit the number of files to process to more manageable: hundreds to thousands (10^3-10^5).Maximum Number of Parallel Loads
: if the value of this optional parameter is greater than 1, the Flow will create multiple threads to load data in parallel –– one thread per table but not more than the threshold set in this parameter.Parallel
: enable this flag if you have more than one source-to-destination transformation that you want to execute in parallel.Purge File if Success
: if this option is enabled (by default), the source files will be automatically deleted after theCOPY INTO
command is successfully executed.
Debug
Log each executed SQL statement
: enable this flag if you want to log each automatically generated and executed SQL statement, including before/after Load SQL.
Error recovery
Continue loading data into other tables if Error
: if this parameter is enabled (it is disabled by default) and there is an error when loading data into some table the flow will continue loading data into other tables. If configured the notification will be sent to the webhook.
Load parameters
Direct load
: if this option is enabled (it is disabled by default) the flow loads data into the Synapse Analytics table directly without an intermediate temporary table. EnablingDirect load
is the fastest way to load data into Synapse Analytics but it does not support MERGE and CDC MERGE.Use COPY INTO with a wildcard pattern
: If this option is enabled (it is disabled by default) the flow loads all files into a specific destination table in one call using COPY INTO with a wildcard pattern. The advantage of this approach is the speed of load. The disadvantage is an inability to handle files with a different number of columns.Action
: can beCOPY INTO
which inserts records from the file(s) into the Synapse Analytics table,MERGE
which merges records from the file with the records in the Synapse Analytics table, andCDC MERGE
which loads CDC events (INSERT
/UPDATE
/DELETE
) in the same order as they originated in the source database.MERGE
andCDC MERGE
require configuring theLookup Fields
or/and enabling the .How to MERGE
: determines how to perform MERGE into the Synapse Analytics table. The default is DELETE/INSERT: DELETE all records in the actual table that also exist in the temp table, then INSERT all records from the temp table into the actual table. If this parameter is set to Native MERGE the flow will execute native Synapse Analytics MERGE SQL. Note that native MERGE requires the table to be created with DISTRIBUTION HASH.Create tables with DISTRIBUTION = HASH
: if this option is enabled the actual table is created as CREATE TABLE (COLUMNS) (WITH DISTRIBUTION = HASH(PK)). A hash-distributed table has a distribution column that is the hash key. IMPORTANT: Native MERGE requires enabling this option.Handle explicit CDC updates
: The typical stream produced by CDC flow includes all columns in the monitored table. In some cases, CDC flow captures changes in the explicitly changed columns only, for example, when the supplemental logging for the Oracle table is configured as 'LOG DATA (PRIMARY KEY) COLUMNS' or 'LOG DATA (UNIQUE) COLUMNS'. Enable this option so the CDC MERGE statement can preserve the original values of the columns NOT captured by CDC flow.Lookup Fields
: the comma-separated list of fields that uniquely identify the record in the target Synapse Analytics table.- If this option is enabled and
Lookup Fields
is empty, the system will try to predict the columns that uniquely identify the record in the target Synapse Analytics table.
COPY INTO Parameters
Line Separator
: the default is/n
on Windows and0x0a
on Linux. You can override it using this property. It is useful if CSV files are generated in one operating system but will be loaded in another.Max # of Errors to Ignore
: specifies the maximum number of reject rows allowed in the load before theCOPY
operation is canceled. Each row that cannot be imported by theCOPY
operation is ignored and counted as one error. Ifmax_errors
are not specified, the default is0
.Error File Directory
: specifies the directory relative to the container within theCOPY
statement where the rejected rows and the corresponding error file should be written. If the specified path doesn't exist, one will be created on your behalf. A child directory is created with the namerejectedrows
.Identity Insert
: specifies whether the identity value or values in the imported data file are to be used for the identity column. IfIDENTITY_INSERT
isOFF
(default), the identity values for this column are verified but not imported. Azure Synapse Analytics will automatically assign unique values based on the seed and increment values specified during table creation. Note the following behavior with theCOPY
command: IfIDENTITY_INSERT
isOFF
, and table has an identity column; a column list must be specified which does not map an input field to the identity column. IfIDENTITY_INSERT
isON
, and table has an identity column, if a column list is passed, it must map an input field to the identity column. The default value is not supported for theIDENTITY COLUMN
in the column list.IDENTITY_INSERT
can only be set for one table at a time.Date Format
: possible values:mdy
|dmy
| >ymd
|ydm
|myd
|dym
. Specifies the date Format of the date mapping to SQL Server date Formats.
COPY INTO parameters
Truncate Columns
: if enabled (disabled by default), theCOPY INTO
command will be generated with aTRUNCATECOLUMNS =true option
. Strings are automatically truncated to the target column length.On Error
: what to do in case of any error. Available options are:ABORT_STATEMENT
(default),CONTINUE
,SKIP_FILE
.
Change wildcard pattern for COPY INTO
By default, the flow sets the wildcard pattern to the longest common string for the files that will be loaded into a specific table. WhenUse COPY INTO with a wildcard pattern
is enabled it is possible to change the wildcard pattern using JavaScript.
Available variables:
etlConfig
: the config.table
: the table name.commonFileName
: the default wildcard pattern.files
: the list of files.
Example
value = table + '_cdc_stream_';
ELT
Set variables for Before and After SQL
: code in JavaScript to set the variables that can be referenced as {varname} in the Before and/or After SQL. To set the variable add it as a key-value pair to the script variable 'vars'.- Example:
vars.put('schema', 'abc');
- Example of the SQL:
insert into test (schema_name) values ('{schema}')
- Example:
Before COPY SQL
: this SQL will be executed before runningCOPY INTO SQL
.Ignore errors when executing Before COPY SQL
: if this option is enabled, and there is an error whenBefore COPY SQL
is executed, the error will be ignored.Before COPY SQL is a script
: if this option is enabled,Before COPY SQL
will be executed as a TSQL Script.After COPY SQL
: this SQL will be executed after runningCOPY INTO SQL
.Ignore errors when executing After COPY SQL
: if this option is enabled and there is an error whenAfter COPY SQL
is executed, the error will be ignored.After COPY SQL is a script
: if this option is enabled,After COPY SQL
will be executed as a TSQL script.
Handling source schema changes
Alter target table if the source has columns that the target table doesn't have
: if this option is enabled (it is disabled by default), and the source has different fields than the target table, the system will add extra fields to the target table.
Create a new Synapse connection for each table
Enable this option if you want to execute ETL scrips as a part of the load. Synapse Analytics allows only one stored procedure or TSQL code fragment to be executed from a single connection at any given time. So when the same connection is shared by all loaders (each loading into its own Synapse table) AND the ELT scripts are used the queries are getting queued which creates a performance bottleneck.
To enable select Create new destination connection for each table
:
Step 7. Optionally, add a mapping.
The mapping can be used to:
- Rename the column.
- Exclude the column.
- Change the column data type for tables automatically created by the flow.
Configure mapping
Select transformation and click MAPPING
.
Add mapping for column CURRENT_DATE
.
The renaming or excluding the column in mapping will work for all tables that have this column. If the table does not have the column the mapping will be ignored.
Step 8. Optionally, add more source-to-destination transformations
Click [+]
and repeat steps 5 to 6 for more locations and/or file name templates.
Comments
0 comments
Please sign in to leave a comment.