Skip to content

Workflow Engine

Angel Sanadinov edited this page Jun 27, 2017 · 5 revisions

Workflows: Workflows | Engine | Transaction Logs

Overview

The workflow engine is responsible for executing workflows, verifying permissions and ensuring data integrity.

Required input

  • Workflow name - the unique name of the workflow to be executed
  • Parameters - the raw JSON parameters to be processed and used by the workflow
  • User Token - the user making the request

Execution steps

  1. Checks that the workflow exists and is enabled
  2. Checks that the user has the required permissions
  3. Parses the parameters using the workflow's parseParameters(...) method (if any are needed)
  4. Retrieves the input data using the workflow's loadData(...) method (if any is needed)
  5. Creates a transaction log to note the start of the actual execution (more)
  6. Calls the workflow's executeAction(...) method and retrieves its result
  7. If successful, processes the output data (adds, updates or removes containers)
  8. Creates a transaction log to note the end of the actual execution (more)
  9. If requested, returns the output data along with the workflow result

Permissions

A user token is created for each user (outside of the workflow engine; depending on how the requests are received) and that token should contain a list of workflows that the user can execute.

Data integrity

Whenever changes are made to data/containers by a workflow, the engine checks that the appropriate revision IDs and numbers are used (for mutable containers). This is done to avoid having multiple requests making changes to the same data at the same time. In this scenario, the first request to be received and processed will succeed; the remaining ones will be working with (now) invalid revisions and will fail.

Output data processing

The output data from a workflow's execution is essentially a description of what changes to make to the DB(s):

/**
  * Container for handling outgoing workflow data.
  *
  * @param add    a list of containers to be added
  * @param update a list of containers to be updated
  * @param delete a list of containers to be removed
  */
case class OutputData(
  add: Vector[Container] = Vector.empty,
  update: Vector[MutableContainer] = Vector.empty,
  delete: Vector[MutableContainer] = Vector.empty
)

For the update and delete lists, additional revision checks are performed (see above). All containers in the add list get directly created.

Operations order

  1. Validate revisions for update and delete containers
  2. Add all containers in add
  3. Delete all containers in delete
  4. Update all containers in update

Note: If a workflow marked as readOnly provides any output data (i.e. it's trying to make changes), the execution fails and no data is committed.

Example engine provider:

import akka.actor.ActorSystem
import akka.util.Timeout
import com.google.inject.{AbstractModule, Provides, Singleton}
import core3.config.StaticConfig
import core3.database.dals.DatabaseAbstractionLayer
import core3.workflows._
import net.codingwell.scalaguice.ScalaModule

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

class Module extends AbstractModule with ScalaModule {

  ...
  type DefsMap = Map[ContainerType, BasicContainerDefinition with JsonContainerDefinition]

  @Provides
  @Singleton
  def provideContainerDefinitions(): DefsMap = {
    val groupDefinitions = 
      new core.Group.BasicDefinition 
        with core.Group.JsonDefinition
    
    val transactionLogDefinitions = 
      new core.TransactionLog.BasicDefinition 
        with core.TransactionLog.JsonDefinition
    
    val localUserDefinitions = 
      new core.LocalUser.BasicDefinition 
        with core.LocalUser.JsonDefinition
  
    Map(
      "Group" -> groupDefinitions,
      "TransactionLog" -> transactionLogDefinitions,
      "LocalUser" -> localUserDefinitions
    )
  }

  @Provides
  @Singleton
  def provideWorkflows(): Vector[WorkflowBase] = {
    Vector(
      definitions.SystemCreateGroup,
      definitions.SystemCreateLocalUser,
      definitions.SystemDeleteGroup,
      definitions.SystemDeleteLocalUser,
      definitions.SystemQueryGroups,
      definitions.SystemQueryLocalUsers,
      definitions.SystemQueryTransactionLogs,
      definitions.SystemUpdateGroup,
      definitions.SystemUpdateLocalUserMetadata,
      definitions.SystemUpdateLocalUserPassword,
      definitions.SystemUpdateLocalUserPermissions
    )
  }
  
  @Provides
  @Singleton
  def provideEngine(
    system: ActorSystem,
    workflows: Vector[WorkflowBase],
    db: DatabaseAbstractionLayer,
    definitions: DefsMap
  )(implicit ec: ExecutionContext): WorkflowEngine = {
    val engineConfig = StaticConfig.get.getConfig("engine")
    implicit val timeout = Timeout(engineConfig.getInt("requestTimeout").seconds)
  
    new WorkflowEngine(
      system.actorOf(
        WorkflowEngineComponent.props(
          workflows,
          db,
          StoreTransactionLogs.fromString(engineConfig.getString("storeLogs")),
          TransactionLogContent.WithParamsOnly,
          TransactionLogContent.WithDataAndParams
        )(ec, definitions)
      )
    )
  }

  ...
}

Static configuration:

server.static {
  
  ...

  //core3.workflows.WorkflowEngineComponent
  engine {
    //Available options (not case sensitive):
    //- No transactions logged -> 'Never' or 'Off'
    //- All transactions logged -> 'Always' or 'RW'
    //- Only Read transcations logged -> 'OnReadOnly' or 'RO' or 'Read'
    //- Only Write transactions logged -> 'OnWriteOnly' or 'WO' or 'Write'
    storeLogs = "OnWriteOnly" // [Never (Off), OnReadOnly (RO, Read), OnWriteOnly (WO, Write), Always (RW)]
    requestTimeout = 5 //in seconds
  }

  ...

}

Clone this wiki locally