-
Notifications
You must be signed in to change notification settings - Fork 1
Workflow Engine
Workflows: Workflows | Engine | Transaction Logs
The workflow engine is responsible for executing workflows, verifying permissions and ensuring data integrity.
- Workflow name - the unique name of the workflow to be executed
- Parameters - the raw JSON parameters to be processed and used by the workflow
- User Token - the user making the request
- Checks that the workflow exists and is enabled
- Checks that the user has the required permissions
- Parses the parameters using the workflow's
parseParameters(...)method (if any are needed) - Retrieves the input data using the workflow's
loadData(...)method (if any is needed) - Creates a transaction log to note the start of the actual execution (more)
- Calls the workflow's
executeAction(...)method and retrieves its result - If successful, processes the output data (adds, updates or removes containers)
- Creates a transaction log to note the end of the actual execution (more)
- If requested, returns the output data along with the workflow result
A user token is created for each user (outside of the workflow engine; depending on how the requests are received) and that token should contain a list of workflows that the user can execute.
Whenever changes are made to data/containers by a workflow, the engine checks that the appropriate revision IDs and numbers are used (for mutable containers). This is done to avoid having multiple requests making changes to the same data at the same time. In this scenario, the first request to be received and processed will succeed; the remaining ones will be working with (now) invalid revisions and will fail.
The output data from a workflow's execution is essentially a description of what changes to make to the DB(s):
/**
* Container for handling outgoing workflow data.
*
* @param add a list of containers to be added
* @param update a list of containers to be updated
* @param delete a list of containers to be removed
*/
case class OutputData(
add: Vector[Container] = Vector.empty,
update: Vector[MutableContainer] = Vector.empty,
delete: Vector[MutableContainer] = Vector.empty
)For the update and delete lists, additional revision checks are performed (see above). All containers in the add list get directly created.
- Validate revisions for
updateanddeletecontainers - Add all containers in
add - Delete all containers in
delete - Update all containers in
update
Note: If a workflow marked as
readOnlyprovides any output data (i.e. it's trying to make changes), the execution fails and no data is committed.
import akka.actor.ActorSystem
import akka.util.Timeout
import com.google.inject.{AbstractModule, Provides, Singleton}
import core3.config.StaticConfig
import core3.database.dals.DatabaseAbstractionLayer
import core3.workflows._
import net.codingwell.scalaguice.ScalaModule
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
class Module extends AbstractModule with ScalaModule {
...
type DefsMap = Map[ContainerType, BasicContainerDefinition with JsonContainerDefinition]
@Provides
@Singleton
def provideContainerDefinitions(): DefsMap = {
val groupDefinitions =
new core.Group.BasicDefinition
with core.Group.JsonDefinition
val transactionLogDefinitions =
new core.TransactionLog.BasicDefinition
with core.TransactionLog.JsonDefinition
val localUserDefinitions =
new core.LocalUser.BasicDefinition
with core.LocalUser.JsonDefinition
Map(
"Group" -> groupDefinitions,
"TransactionLog" -> transactionLogDefinitions,
"LocalUser" -> localUserDefinitions
)
}
@Provides
@Singleton
def provideWorkflows(): Vector[WorkflowBase] = {
Vector(
definitions.SystemCreateGroup,
definitions.SystemCreateLocalUser,
definitions.SystemDeleteGroup,
definitions.SystemDeleteLocalUser,
definitions.SystemQueryGroups,
definitions.SystemQueryLocalUsers,
definitions.SystemQueryTransactionLogs,
definitions.SystemUpdateGroup,
definitions.SystemUpdateLocalUserMetadata,
definitions.SystemUpdateLocalUserPassword,
definitions.SystemUpdateLocalUserPermissions
)
}
@Provides
@Singleton
def provideEngine(
system: ActorSystem,
workflows: Vector[WorkflowBase],
db: DatabaseAbstractionLayer,
definitions: DefsMap
)(implicit ec: ExecutionContext): WorkflowEngine = {
val engineConfig = StaticConfig.get.getConfig("engine")
implicit val timeout = Timeout(engineConfig.getInt("requestTimeout").seconds)
new WorkflowEngine(
system.actorOf(
WorkflowEngineComponent.props(
workflows,
db,
StoreTransactionLogs.fromString(engineConfig.getString("storeLogs")),
TransactionLogContent.WithParamsOnly,
TransactionLogContent.WithDataAndParams
)(ec, definitions)
)
)
}
...
}server.static {
...
//core3.workflows.WorkflowEngineComponent
engine {
//Available options (not case sensitive):
//- No transactions logged -> 'Never' or 'Off'
//- All transactions logged -> 'Always' or 'RW'
//- Only Read transcations logged -> 'OnReadOnly' or 'RO' or 'Read'
//- Only Write transactions logged -> 'OnWriteOnly' or 'WO' or 'Write'
storeLogs = "OnWriteOnly" // [Never (Off), OnReadOnly (RO, Read), OnWriteOnly (WO, Write), Always (RW)]
requestTimeout = 5 //in seconds
}
...
}Home | Getting Started | Structure | Containers | Workflows | Controllers