Workflows
Flux employs a traditional workflow model to describe how events should be detected and how tasks should be executed. Each workflow is built from a few basic components:
- actions, which perform some operation or task, like invoking a process or copying a file
- triggers, which wait for an event to occur, such as a scheduled time or the arrival of a file
- flows, which guide the execution of the workflow
Workflows can be arbitrarily complex, from a simple A - B - C progression to a looping workflow with scheduled components, multiple concurrent steps, and event triggers at points throughout the flow.
Creating Workflows
Workflows are created by arranging triggers, actions, and flows to instruct Flux on which tasks to execute and in which order. Workflows can be named and stored using “namespaces”, unique names that allow individual workflows to be identified throughout the system.
Actions
Actions perform an event or task on the system. These tasks might include running a script, updating a database, or moving a file. A complete list of available actions can be found at Triggers and Actions.
When the flow of control in a workflow arrives at an action, the action will immediately begin executing its specified task.
If the action is a start action, it will immediately execute when the workflow is started, and the workflow will then continue through the workflow once the action completes.
Once the action has completed, two things will occur:
- The flow of control moves on in the workflow, following any outgoing flows to the next trigger or action.
- The action may place a result into the flow context. The result is placed into a variable called “RESULT”. The result variable, and any fields it contains, are then accessible for the next trigger, action, or flow to use.
Triggers
Triggers wait for an event to occur in a workflow. Triggers can fire on time-based events, file events, database conditions, and even internal conditions in Flux, like other workflows reaching a particular milestone. A complete list of triggers in Flux is available in Triggers and Actions.
When the flow of control in a workflow arrives at a trigger, it will begin monitoring for its event condition. If the condition is already satisfied, the trigger will fire immediately; otherwise, it will wait for its event to occur.
If the trigger is a start action, the workflow will begin by waiting for the trigger’s event condition to occur and will continue through the workflow when the trigger fires.
When a trigger fires, two things occur:
- The flow of control moves on in the workflow, following any outgoing flows to the next trigger or action.
- The trigger may place a result into the flow context. As with actions, the result is placed into a variable called “RESULT”. The result variable, and any fields it contains, are then accessible for the next trigger, action, or flow to use.
Flows
Flows allow you to specify how actions and triggers connect to one another, guiding the flow of execution through a workflow.
When a trigger or action fires, the Flux engine will follow any available outgoing flows to determine which triggers or actions to execute next.
On its own, Flux will never make any assumptions about how the workflow should execute – if an action or triggers fires and there are no outgoing flows, the workflow will end. Take note of this when designing your workflows — if your workflow runs in a loop, for example, you’ll need to include a flow from the final action of the loop back to the beginning, to allow the next cycle of the loop to execute.
You can also use conditional flows to indicate branching paths that should be taken only if a particular condition is satisfied — for example, taking a flow when an error occurs, or when an action returns a particular result.
Conditional Flows
Conditions are evaluated using an expression language that evaluates variables. Variables can be the results returned from triggers and actions, or they can be values that you create and store, either from within your workflow or at design time.
If a trigger or action returns a result, it will be stored in a new flow context variable called “result”. You can access this result, as well as any of its available properties, in your conditional flows.
For an example of this, let’s look at the Timer Trigger. As you’ll see in the previous link, this trigger returns a result that contains a single property, called “result”, that indicates whether the trigger has expired. If you want to set a condition on a flow, so that the flow is followed only if the trigger has not expired (and, therefore, the property “result” on the trigger’s result object is ‘true’), you can set the following condition:
result.result = TRUE
In this condition, we first name the variable that we want to compare, “result”. This is followed by a ‘.’ and the name of the property to use — in this case, the property is also called “result”. Finally, we add the condition we want to use — follow this flow only if the result property of the result variable is true.
If you’re just checking whether a variable is true, you can also just name the variable in your condition, like so:
result.result
If the conditional flow only contains a variable and, optionally, a property name, Flux will automatically evaluate that to mean “only take this condition if the specified variable/property is set to true”.
Sometimes, results will have properties with other names, as well. For example, the Mail Trigger has a result property called “subject” — you could use this string property in a conditional flow like so:
result.subject LIKE '%urgent%'
You can see the complete list of Triggers and Actions to find which actions return which result properties.
Conditional Flow Syntax
You can create conditional expressions very similar to the WHERE clause of SQL queries. It is possible to write conditional expressions like result.salary >= 5000
, evaluate multiple variables in your flow context like (result.salary >=5000) AND (result.age != 20)
, evaluate data contained in a java.util.Map, and other types of complex expressions.
Conditional flows can be created using the syntax described in the following table. You can mix and match these operators and use multiple variables to create complex statements that accurately describe your requirements.
Conditional Expression / Message Selector Syntax
Operators | Description |
---|---|
= | Equal To |
> | Greater Than |
< | Less Than |
>= | Greater Than or Equal To |
<= | Less Than or Equal To |
<> | Not Equal To |
+ | Addition |
- | Subtraction |
* | Multiplication |
/ | Division |
% | Wildcard character for use with LIKE |
LIKE | Test whether a pattern appears in a string or variable (requires the wildcard character, ‘%’, to be used in the pattern) |
NOT | TRUE if the conditional expression evaluates to FALSE |
AND | TRUE if all conditional expressions evaluate to TRUE |
OR | TRUE if any conditional expression evaluates to TRUE |
IN | TRUE if your value exists within a collection of values |
mapname[‘keyname’] | Evaluates data stored under the name “keyname” in a map with the name “mapname” |
. | Evaluates public fields in the value. |
Strings are denoted using single ‘tick’ characters, such as ‘this is my string’. For example, to test if a result variable is equal to a particular string, you could write the following:
result.name = 'my name here'
Note: there is no way to include a single ‘tick’ character within the string itself. The ‘tick’ characters (single quotation marks) can appear only on the left and right edges of your string. For example, it is impossible to describe a string that contains an apostrophe, like so: my friend's name here
Else Flows
An else is followed if there are no unconditional flows and none of the conditional flows were satisfied. There can be at most one else flow branching out of a trigger or action.
Looping Flows
In Flux, a “loop” occurs any time a trigger or action (or a group of triggers or actions) runs more than once during the execution of a workflow. In other words, a loop allows you to run one or more triggers or actions repeatedly within a workflow.
In order to create a loop, you’ll need to design your workflow to allow your triggers and actions to run more than once. Because Flux uses the workflow model to define workflows, every trigger and action in Flux can only move on to the next step in the workflow if there is a flow for it to follow. If there are no outgoing flows from a trigger or action, the workflow (and the workflow) will end at that point, and the workflow is considered to have finished (and will then be automatically removed from the engine).
To create a loop, therefore, you just need to be sure that the last trigger or action in the loop has a flow going back to the trigger or action that should begin the loop. This will instruct Flux to go back and re-execute the first trigger or action in the loop once the final item has been completed, then follow the flows through the loop again as normal, looping back once more once the final trigger or action has run.
Keep in mind that if a trigger or action does have an outgoing flow, but the flow is a conditional flow that is not satisfied during a particular run, that flow will not be followed on the run. If there are no other flows that can be followed, the workflow will stop running at that point.
This same behavior can be used to “break” a loop at a particular time. You can simply create a flow context variable containing a flag that indicates whether the loop should keep running, then use conditional flows to check the value of the variable. If the variable is “true”, follow a conditional flow that keeps the loop running, and if it’s “false”, follow another conditional flow to a different set of actions (or end the workflow altogether).
In short: triggers and actions in Flux will follow any outgoing flows (as long as they are not conditional flows whose conditions aren’t satisfied). You can use this behavior to loop backward in your workflow and create a chain of triggers and actions that runs repeatedly, or until a condition that you define causes the loop to exit.
Error Flows
Error flows allow you to control how errors are handled on a per-action level. You can find more information about error flows in the Error Handling documentation.
Custom Error Handlers
Custom error handlers are specialized workflows that allow you to define how a workflow reacts to errors. If an action throws an error and does not have an error flow to follow, it will invoke the custom error handler is invoked.
The Error Handling documentation contains more information about creating and installing custom error handlers.
Workflow Properties
The following properties can be set on any workflow in Flux.
Deadline Date
The date when this workflow (or workflow run) is expected to be completed. A flux.audittrial.server.DeadlineExceededEvent is published to the audit trail if this workflow does not complete execution by the deadline.
If the workflow does not contain a run (that is, one action marked as a start of run and another marked as an end of run), the deadline will apply across the entire lifetime of the workflow. If the workflow does contain a run, the deadline will be re-evaluated for each run of the workflow.
A deadline date cannot be set if a deadline time expression is set.
Deadline Time Expression
A time expression specifying the time frame that the workflow (or workflow run) is expected to be completed in. For example, a deadline time expression of “+7m” means the workflow (or each run in the workflow) is expected to finish within seven minutes.
If the workflow does not contain a run (that is, one action marked as a start of run and another marked as an end of run), the actual deadline date is calculated when the workflow is first exported to the engine by applying the time expression to the date and time that the workflow was exported. If the workflow does contain a run, the deadline date for each run within the workflow is calculated at the start of that run.
A deadline time expression cannot be set if a deadline date is set.
Deadline Window
A relative time expression which specifies how soon before the deadline the engine will publish the event flux.audittrail.server.DeadlineApproachingEvent to the audit trail. This event allows you to use the audit trail to view any workflows that are approaching their deadline. The time expression should be relative to the date and time of the actual deadline (for example, a time expression of “-7m” would mean “seven minutes before the date and time of the deadline).
The deadline window defaults to “-1H” (one hour before the deadline) if this property is not defined.
Listener Class Path
If your workflow contains Java Actions or Dynamic Java Actions, the listener class path allows you to dynamically load your action listeners without requiring a JVM restart (and even allows you to dynamically install new or updated versions of your listeners without restarting the JVM).
The listener classpath can be set directly in the workflow using this property, or more broadly on the runtime configuration tree. If the listener is set using this property, it will take precedence over any classpath set in the runtime configuration.
The listener classpath can be used to load JAR files on the file system. For example, assume that you have a file database_listener.jar located on the directory C:\action_listeners on Windows (or /action_listeners on Unix). To allow the workflow to dynamically load any listener classes in the JAR, you could just set the listener class path to:
/action_listeners/database_listener.jar
The listener classpath will also accept a path to a directory rather than a particular JAR. If a directory is specified, any JAR files within that directory will be automatically included in the listener classpath.
You can even mix and match JAR file and directory paths in the listener classpath. If, for example, we have multiple JAR files in the directory /reports_listeners that we want to include in our listener classpath, we can modify the listener class path above to:
workflow.setListenerClasspath("/action_listeners/databaseListener.jar:/reports_listeners");
Each item in the list is separated by a “:” or “;” character, depending on the operating system on which the code runs (note that on Windows, a letter followed by either “:” or “;”, like “C:”, is interpreted as a drive letter). Paths to JAR files and directories can contain the “/” or “" path separator characters.
You can also specify an HTTP URL path to a JAR file in the listener classpath (note, though, that URL paths cannot be mixed and matched with local JAR or directory paths).
For example, if database_listener.jar was located at the URL http://localhost:8080/jars/database_listener.jar, you could use the following listener classpath setting:
workflow.setListenerClasspath("http://localhost:8080/jars/database_listener.jar");
We strongly recommend that you do not use the listener classpath setting if you are running Flux within an application server. The application you use (probably packaged as a WAR or EAR file) should be structurally sound within itself and should not require outside classes to run.
The listener classpath is only designed to load a few listener classes for use with Java Actions or Dynamic Java Actions. If you need to load hundreds or thousands of classes, or if you need to load dependent JARs for your listeners, you should use the standard JVM class loading techniques rather than the listener classpath.
The listener classpath also cannot be used to reload a JAR file if it also appeared on the JVM classpath. Take care to ensure that any classes that need dynamically reloaded are not included on the JVM classpath.
Priority
Every workflow is assigned a priority. Higher priority workflows generally run before lower priority workflows. workflow priorities make it possible to specify that important or time-critical workflows need to run before other, less important, workflows. workflow priorities allow you to specify that if two workflows could otherwise both execute at the same time but concurrency throttles will allow only one of those two workflows to run, the workflow with the higher priority runs first.
If a workflow has a priority explicitly set, it will override any priority set in the runtime configuration. If a null workflow priority is used, it will allow the runtime configuration priority to take precedence.
The highest priority is 1. Lower priorities have values greater than 1, such as 10, 25, 500, etc.
If two different workflows have different priorities, the workflow with the priority closer to 1 runs first. If two different workflows have the same priority, then the workflow with the oldest timestamp runs first. Each running workflow contains a timestamp that indicates the next time the workflow requires attention from the Flux engine.
It is possible that higher priority workflows will run so frequently as to prevent lower priority workflows from running at an acceptable rate. This behavior is called starvation, that is, lower priority workflows can be starved or prevented from running as frequently as you would like. By default, Flux permits starvation, because it often does not cause any serious problems, and it is sometimes desirable.
If starvation impacts you adversely, you can enable the Flux configuration property FAIRNESS_TIME_WINDOW. This property contains a time expression that indicates how frequently starved workflows should have their priorities increased by 1 (the effective priority). (To increase a workflow’s effective priority, its numerical value is decremented by 1.) Eventually, such workflows will reach an effective priority of 1 and be eligible for execution before all other workflows. After firing, such workflows revert to their original priorities and are again eligible to have their priorities increased if they starve. This anti-starvation behavior is called fairness.
Note that whenever a trigger fires internally, even if it does not fire out of the trigger and force execution of the workflow to continue to the next action, that workflow’s effective priority is reset to its original priority.
Also note that because starved workflows have their priority increased by one (their priority value decremented by one), appropriate priorities should be given at the start. If you have your highest priority workflow set to one, and all others set to 500, the benefits of fairness may not be significant or even noticeable.
workflow priorities are stored in an engine’s runtime configuration. Each branch in the runtime configuration tree can contain a PRIORITY property, which specifies the priorities of all workflows in that branch of the tree. If a PRIORITY property is not explicitly set, the engine searches higher branches in the runtime configuration until it finds a PRIORITY property. If no explicit PRIORITY property is found, even at the root of the tree, then the workflow defaults to a priority of 10.
Changes made to PRIORITY properties in the runtime configuration tree are propagated to running workflows in the engine.
Operations
Flux allows you to perform operations on a workflow (or group of workflows in a namespace) to influence the workflow’s behavior at runtime. For example, you might recover a failed workflow, expedite a workflow that is waiting to run, or remove a workflow from the engine entirely.
Operations are performed from the Operations Console. To operate on a workflow, navigate to the workflows page of the console, select the workflow or namespace you want to act on, then select the appropriate operation from the bottom of the grid.
The operations listed below can be applied to a single workflow (acting on all flow contexts in that workflow), or across an entire namespace (acting on all flow contexts for every workflow in the namespace).
Advanced Users (Java knowledge required) You can also perform these operations using APIs available from the Flux engine. For more information on using these APIs, refer to the Javadoc documentation for the flux.Engine and flux.Transactable interfaces.
Recover
For any flow contexts in the workflow that are in the ERROR or FAILED sub-state, the recover operation will roll back the flow context to the beginning of the transaction that caused the error condition.
This will cause the flow context to begin executing again from the last successful transaction break is reached. See Transactions for more information about transaction breaks.
The recover operation does not affect any flow contexts that are not in the ERROR or FAILED sub-state.
Expedite
For any flow contexts in the workflow that are waiting on a trigger that can be expedited, the expedite operation will cause those triggers to fire as soon as possible.
The following triggers can be expedited:
- Business process trigger
- Timer trigger
- Delay trigger
- Manual trigger
If a flow context is running an action when the expedite is performed, it will continue running as normal until it reaches a trigger that can be expedited. At that point, it will expedite the trigger and continue running.
If the flow context is waiting on a trigger that cannot be expedited, it will ignore the expedite operation.
Expediting a timer trigger or delay trigger will not decrease the count for that trigger, or cause it to be rescheduled. Expediting is a one-time operation that does not impact normally scheduled firings.
Interrupt
Rolls the workflow back to its last transaction break as soon as possible. If the workflow has already started running an action or a trigger is in the process of firing, the workflow must continue executing normally until it reaches the next transaction break, then roll back to the previous transaction break from there.
After the workflow rolls back, it will begin running again immediately from the point of the transaction break.
Interrupt is typically used with the pause and/or remove operations, to roll back the work done by a workflow before pausing or removing it. Typically, users will first interrupt, then pause or remove the workflow – once the workflow is able to roll back, it will do so, then apply the pause or remove operation (as opposed to the normal case, where the operation is simply performed after Flux commits the transaction at the next transaction break).
NOTE: Rolling back the workflow will undo any work that Flux has performed in the database (including workflow state changes), but note that it cannot undo work that was performed on the system (like file transfers or native process invocations). Take note of this when interrupting a workflow whose actions might impact the file system or other software systems.
Remove
Removes executing flow contexts from the engine. It is not possible to interrupt a running action once it has started, so if any actions have already begun running when the remove operation is performed, the Flux engine will wait until the flow reaches a transaction break, then remove the workflow at that point.
If you want Flux to roll back its work in the database before the workflow is removed, you can first interrupt the workflow, then remove it. This will roll back the work in the database before removing the workflow, rather than committing it as normal.
Advanced Users (Java knowledge required) Calling remove() using the current workflow ID from within a running workflow will fail, because at the next transaction break, the engine will see that the workflow was deleted and rollback the transaction. Rolling back the transaction will undo the work of the remove() method. A simple workaround is to fork a new thread from within the running workflow, have that thread call Engine.remove(), and wait for the thread to finish. This solution works because the new thread uses a new database connection to perform its work.
Removing “Stuck” workflows
** Advanced Topic: Database Administration**
The workflow removal method below requires direct access to (and manipulation of) the data in the Flux database tables. Some database administration knowledge may be required to successfully complete this operation.
In some extreme cases, a workflow can become “stuck” executing and unable to be removed from the Operations Console. Typically, this occurs if there is a corruption or invalid data in the database that prevents Flux from successfully removing the flow.
To remove such a workflow, you can delete it directly from the database. Every workflow has at least one entry in the FLUX_READY database table. You can identify which workflow a column belongs to using the NAMESPACE column in that table.
To delete a workflow, just remove any entries for the workflow from the FLUX_READY table. The Flux engine will then automatically clean up any associated data for the workflow in any other tables, so no further manual action is necessary to remove the workflow.
Note that if the workflow contains an action that is still executing (in-memory), the action will continue running until the workflow reaches a transaction break. Because Java does not provide a clean way to destroy a running action once it has started, Flux must wait for the action to complete before it is fully removed from the engine.
Pause
Pauses all flow contexts in the selected workflow to prevent them from executing further. The flow context will remain in the PAUSED state until it is resumed, either manually by a user or through an API call.
Due to the relationship between the Flux Engine and the database, it is not possible for the engine to put a flow context in the PAUSED state until it reaches the next transaction break. This means that if you pause a flow context while an action is executing, the engine must finish the currently executing action before putting the flow context into the PAUSED state. In addition, if you pause the flow context in the middle of a transaction (that is, the middle of a series of actions that are not marked as transaction break) the flow context will not enter the PAUSED state until it reaches the next transaction break.
Because of this, all of the actions that occur between the time you pause the flow context and the next transaction break (including any actions that are currently executing at the time of the pause) will be executed in their entirety before the flow context enters the PAUSED state.
If you want to roll back to the last transaction break (undoing any work Flux performs in the database) when the workflow is paused, you can first interrupt, then pause the workflow. If the workflow is interrupted before pausing, Flux will roll back the transaction to the last transaction break, then pause the workflow at that point and await a user’s input before resuming.
Resume
Resumes any flow contexts in the PAUSED state for the selected workflow. This will allow the flow contexts to resume normal execution.
If the workflow selected does not have any flow contexts in the PAUSED state, the resume operation is ignored.