When to use loops
You can create a pipeline (nested Flow) where some of the steps (inner Flows) are configured to be executed in a loop.
Typical use cases for executing Flows in a loop:
- Extract data from the HTTP endpoints with parameters. The parameters can be provided at runtime by using the loop.
- Use loop to implement API pagination.
- Configure the transformations and Connections by reading configuration parameters from the database.
- Configure Source query dynamically.
- Dynamically setting up the credentials for the Connections.
- Process files in a folder by wildcard.
- Executing Flow a fixed number of times.
Also, read about executing Flows conditionally.
Loop types
Script Loop. Execute Flow while JavaScript or Python expression returns not null
Configure Script loop
Step 1. Create a nested Flow and add steps (inner Flows).
Step 2. Select the Flow to be executed in a loop and go to Flow parameters.
Step 3. Specify the Loop Type
as Script
.
Step 4. Add JavaScript or Python code in Loop Script
.
Loop Script
The Flow continues to run in a loop while the JavaScript or Python expressions returns a not null
value.
Example:
// this flow will be executed while the value of the "NEXT_MARKER" property is not null
var nextCursorMark = com.toolsverse.config.SystemConfig.instance().getProperties().
get("NEXT_MARKER");
value = !com.toolsverse.util.Utils.isNothing(nextCursorMark) ? nextCursorMark : null;
Available variables
The following variables can be referenced by name from JavaScript and Python code:
Name | Class name / JavaDoc | Packages |
---|---|---|
etlConfig | com.toolsverse.etl.core.config.EtlConfig | com.toolsverse.etl.core.config |
scenario | com.toolsverse.etl.core.engine.Scenario | com.toolsverse.etl.core.engine |
evals | 0-based number of loop iteration | |
loopKeyValues |
Map<String, String> - global and flow variables set for each loop iteration |
Programmatically set global and flow variables
Use loopKeyValues to programmatically set global and Flow variables as key=value pairs where both key and value are strings. Example:
loopKeyValues.put('VAR_NAME', 'var_value');
Execute Flow fixed number of times
This variation of the Script loop executes a flow fixed number of times.
Step 1. Configure a Script loop.
Step 2. Enter the following Loop Script
(example):
// this flow will be executed 40 times
value = evals <= 40 ? evals : null;
Database Loop . Read records from a database and execute the Flow for each record
The idea is that you can write a SELECT SQL
statement which, when executed, creates a cursor driving the loop. The Flow in a database loop is executed as many times as the number of records returned by the cursor. For every column in each record in a cursor, the loop automatically sets the global and Flow variables.
Configure database loop
Step 1. Create a nested Flow and add steps (inner Flows).
Step 2. Select the Flow to be executed in a loop and go to Flow parameters.
Step 3. Select SQL
in Loop Type
.
Step 4. Select the database Connection
used for the SQL loop.
Step 5. Enter the SELECT SQL
statement to drive the loop in the Loop Script
field. Optionally enable Execute as
Script
if the loop SQL is in fact script, such as PL/SQL, TSql, etc. Note that script must return a cursor.
Example of the database loop
Let's assume that there is a role table in the database that contains information about roles and permissions. In this example, we are going to be creating CSV files for each role in a database.
Step 1. Create a database Connection that will be used to execute the driving (loop of) SQL.
Step 2. Create any Flow, for example, a Flow that reads data from a database and creates files, or a Flow that executes a web service. We're assuming that you want to create a loop that executes this Flow for each record in the result set.
Step 3. Create a nested Flow and add the Flow created in Step 2.
Step 4. Click the pen
icon, next to the Flow name. The Parameters
pop-up window will open.
Step 5. Select SQL
for Loop Type
.
Step 6. Select the Connection created in Step 1 for the Connection.
Step 7. Enter the driving SQL, for example, select roleid from role
as the Loop Script
.
Step 8. Modify the Flow created in Step 2 to use fields from the result set. Enclose the field name in curly brackets: {field name}
. For example: {roleid}
.
You can use the {field name}
in:
- the
FROM
andTO
fields for a transformation
- an HTTP Connection URL (or any other Connection property)
- anywhere in the source or destination query
When referencing {parameters}
, make sure you are using the same CaSe
for the field name as returned by the database. For example, some databases enforce UPPERCASE
for column names returned by executing the SQL query: select roleid from role --> ROLEID
, while other databases keep the original: select roleid from role --> roleid
.
Debug database loop
Read how to debug a database loop.
File Loop. Read file names matching a wildcard and execute the Flow for each file
There is an easy way to ETL files matching wildcard.
Use file loop if you want to execute more complex scenarios than just Extract-data-from-source-transform-and-load-in-destination.
Configure file loop
Step 1. Create a nested Flow and add steps (inner Flows).
Step 2. Select the Flow to be executed in a loop and go to Flow parameters.
Step 3. Set the Loop Type
to Files by wildcard
.
Step 4. Select the file Connection
used for the loop.
Step 5. Enter the wildcard filename to drive the loop in the File path
field.
File path modifiers
The File path
parameter can include modifies.
;-file1,file2,filen
: a comma separated list of files to exclude. Filenames in the exclude list can be wildcards. Example:*.csv;-test1.csv,test2.csv
- process all files which match the wildcard*.csv
, except filestest1.csv
andtest2.csv
.;+file1,file2,filen
: a comma separated list of files to include. Filenames in the include list can be wildcards. Example:*.csv;+test1.csv,test2.csv
- process filestest1.csv
andtest2.csv
, ignore all other files which match the wildcard*.csv
.<folder>
: only include folders. Example<*test>
- process folders which match the wildcard*test
.;oldest
: sort files from oldest to newest.;newest
: sort files from newest to oldest.;ascdending
: sort files alphabetically with ascending order.;descending
: sort files alphabetically with descending order.;largest
: sort files from largest to smallest.;smallest
: sort files from smallest to largest.
Sort modifiers override sort order set at the connection level.
Global and Flow variables set by the file loop
The following global and Flow variables are automatically set in each iteration of the file loop:
- loop_file_name: the name of the file processed by the loop. The name includes the extension but does not include the path.
- loop_base_file_name: the base name of the file processed by the loop. The name does not include the extension and the path.
- loop_file_id: the id of the file. Currently file id is only set by Google Drive connector. If there is no file id loop_file_id =loop_file_name.
- evals: 0-based number of loop iteration.
Example of the file loop
Step 1. Create a Connection for the source files. Make sure the Connection supports wildcard filenames. Store the name of the Connection.
Step 2. Create a Flow where the source Connection is the Connection created in Step 1 and the destination is whatever you like.
Step 3. Create a nested Flow and add the Flow created in Step 2.
Step 4. Click the pen
icon next to the Flow name and the Parameters
pop-up window will open.
Step 5. Select Files by wildcard
for Loop Type
.
Step 6. Select the Connection created in Step 1 for Connection
.
Step 7. Enter a wildcard filename, such as *.csv
, for Files path
.
Files will remain in the source folder after processing, so you might want to delete or move them to a different folder.
File Loop with filter
This variation of the file loop gets the list of files to process after applying the programmatic filter.
Step 1. Create a new script flow.
Step 2. Add a named source connection to the flow. Use the connection that contains the source files. The name that you assign to the connection is going to be used in the next step. Assign any format, it doesn't matter which one but format is a required field.
Step 3. Add the JavaScript or Python code to filter or split files in buckets.
Available file attributes:
file.getName()
- the name of the file with an extension but without the folder.file.getSize()
- the size of the file in bytes.file.getPath()
- the full path.file.getLastModified()
- the timestamp in Unix epoch when the file was modified.
Read about commonly used packages and classes (with examples).
Filter examples
Here are several examples:
Files created or updated today
importPackage(java.time);
// Get the list of the files in the folder matching the wildcard
// For the connection, use the same name as you assigned
// to the named connection is step 2
var list = com.toolsverse.etl.core.task.common.FileManagerTask.list(etlConfig,
'source', '*.*');
// create buckets
var newFiles = new java.util.ArrayList();
// split files in buckets
if (list != null) {
for each (var file in list) {
if (LocalDate.now().equals(Instant.ofEpochMilli(file.getLastModified()).atZone(ZoneId.systemDefault()).toLocalDate())) {
newFiles.add(file);
}
}
}
// add named buckets to the common object storage
etlConfig.setValue('new_files', newFiles);
Large files
// Get the list of the files in the folder matching the wildcard
// For the connection, use the same name as you assigned
// to the named connection is step 2
var list = com.toolsverse.etl.core.task.common.FileManagerTask.list(etlConfig,
'source', '*.*');
// create buckets
var largeFiles = new java.util.ArrayList();
// split files in buckets
if (list != null) {
for each (var file in list) {
if (file.getSize()>=10000) {
largeFiles.add(file);
}
}
}
etlConfig.setValue('large_files', largeFiles);
Step 4. Create a new nested flow
Step 5. Add flows created in step 1 as a first step.
Step 6. Add flow which will be processing the files as a step 2.
Step 7. Configure scripting loop for the flow which will be processing the files in a bucket:
Click the editor icon:
Configure the Loop script
Loop script for the File Loop with Filter
Here is a template. If you have one bucket the only parameter that you need to modify is the bucket name:
importPackage(com.toolsverse.util);
// gets the files in bucket generated after applying the filter
var files = etlConfig.getValue('same bucket name as in step 3');
if (files != null && !files.isEmpty()) {
// get the file name and id
var fileName = FilenameUtils.getName(files.get(0).getName());
var baseName = FilenameUtils.getBaseName(fileName);
var fileId = files.get(0).getId();
fileId = fileId == null ? fileName : fileId;
// remove the file from the bucket
files.remove(0);
// set loop variables
loopKeyValues.put('loop_file_name', fileName);
loopKeyValues.put('loop_base_file_name', baseName);
loopKeyValues.put('loop_file_id', fileId);
loopKeyValues.put('evals', Utils.makeString(evals));
// continue the loop if the files in the bucket after removing the first file
value = 'true';
} else {
// stop the loop
value = null;
}
Global and Flow variables set by the file loop with filter
The following global and Flow variables are automatically set in each iteration of the file loop:
- loop_file_name: the name of the file processed by the loop. The name includes the extension but does not include the path.
- loop_base_file_name: the base name of the file processed by the loop. The name does not include the extension and the path.
- loop_file_id: the id of the file. Currently file id is only set by Google Drive connector. If there is no file id loop_file_id =loop_file_name.
- evals: 0-based number of loop iteration.
Example of the flow executed by File Loop with Filter
This flow converts all .csv files in a source folder after applying a filter into the .json files with the same name:
Parallel loop. Execute loop iterations in parallel threads
Loop iterations can be executed in parallel threads. For example, if the loop is configured to execute a flow that calls the HTTP endpoint with different parameters (e.g. start and end dates) you can configure the loop to make multiple HTTP calls in parallel, which can greatly improve the performance.
Not all flow types are thread-safe.
To enable a parallel loop, set the maximum number of the Loop Threads
to use when executing the Flow in a loop. If the number is greater than one, each iteration of the loop will be executed in its own thread.
Retry loop iteration in case of error
When configuring a loop, you can set parameters for retrying a loop iteration in case of any error:
Loop retries
: number of retries for each loop iteration in case of error.Retries delay
: number of milliseconds to wait between retries.
Stop the loop after the fixed number of iterations
You can configure any loop to stop after the fixed number of iterations by setting the parameter Maximum number of iterations
. The default is 100000
.
Stop the loop if it exceeds a certain time limit
You can set a loop to stop running if it exceeds a certain time limit by using the Loop timeout
parameter (in milliseconds). It is possible to use {global variable}
to set the value of this parameter.
You can choose to stop the loop or raise an exception in case of timeout by using the On loop timeout
parameter. The default setting is Stop loop
.
If the loop is configured to run iterations in parallel and the Loop timeout
is set, the flow will always raise an exception if the loop runs longer than expected, regardless of the On loop timeout
setting.
If the maximum number of iterations and the loop timeout are both set, then the maximum number of iterations is checked first, followed by the loop timeout.
Stop the loop programmatically
Problem
The loop is running while there are records in the result set returned by the loop query, or while there are files to process, or while the JavaScript condition returns a not null
value. In some cases, you may want to stop the loop early based on conditions.
Solution
Anywhere in the JavaScript or Python code, executed within the loop (for example in After Extract
transformation) add the following line of code. This call is thread-safe.
etlConfig.setRequestStop(true);
If etlConfig is available
if (condition) {
etlConfig.setRequestStop(true);
}
If etlConfig is not available
if (condition) {
com.toolsverse.config.SystemConfig.instance().getEtlThreadContext().getData().setRequestStop(true);
}
Comments
0 comments
Please sign in to leave a comment.