Skip to content

Workflows

Angel Sanadinov edited this page Jun 27, 2017 · 4 revisions

Workflows: Workflows | Engine | Transaction Logs

Overview

A workflow is used to describe an action performed on one or more pieces of data (more).

The execution of a workflow has six distinct steps:

  1. Parse the supplied parameters (with the defined parseParameters(...) method)
  2. Load input data (with the defined loadData(...) method)
  3. Create STARTED transaction log (more)
  4. Execute action (with the defined executeAction(...) method)
  5. Commit data changes
  6. Create COMPLETED_* transaction log (more)

Note: The various implicit parameters have been removed for brevity.

Example workflows

All core workflows can be found here.

Workflow Base

trait WorkflowBase {
  def name: String
  def readOnly: Boolean
  def withSensitiveParams: Boolean = false
  def withSensitiveData: Boolean = false
  def parseParameters(rawParams: JsValue): Future[WorkflowParameters]
  def loadData(params: WorkflowParameters, queryHandlers: DataQueryHandlers): Future[InputData]
  def executeAction(requestID: RequestID, user: UserTokenBase, params: WorkflowParameters, data: InputData): Future[(WorkflowResult, OutputData)]
}

def name: String

A unique name for the workflow, for example: "SystemCreateLocalUser".

Warning: When using Auth0 as a provider, there is a 50 character limit for the permission names and the workflow name is used as a permission name. Therefore, it is best to keep names under that character limit.

def readOnly: Boolean

Specifies whether the workflow is read-only or it makes changes. A workflow specified as read-only will not be allowed to commit data.

def withSensitiveParams: Boolean = false

Specifies whether the parameters contain sensitive information that should not be stored in the transaction logs. Default is false.

withSensitiveData: Boolean = false

Specifies whether the data contains sensitive information that should not be stored in the transaction logs. Default is false.

def parseParameters(...): Future[WorkflowParameters]

Verifies and converts the supplied raw JSON parameters into a format that can be used by the workflow.

Warning: Any exceptions / error conditions not returned as a failed Future will be ignored. If your workflows are silently failing, this is probably what is happening. *

def loadData(...): Future[InputData]

Loads all data that will be needed for the workflow to execute its action.

Warning: Any exceptions / error conditions not returned as a failed Future will be ignored. If your workflows are silently failing, this is probably what is happening. *

def executeAction(...): Future[(WorkflowResult, OutputData)]

Executes the workflow's action with the supplied parameters and data.

Warning: Any exceptions / error conditions not returned as a failed Future will be ignored. If your workflows are silently failing, this is probably what is happening. *

* You will probably see Akka error logs about failing actors and undelivered messages.

Defining parameter objects

A parameter object's purpose is to clearly define the input required for a workflow and the parseParameters(...) function is meant to convert the raw incoming parameters into that object:

import core3.database.containers.core
import core3.database.{ContainerType, ObjectID}
import core3.security.UserTokenBase
import core3.workflows._
import play.api.libs.json.{JsValue, Json}

import scala.concurrent.{ExecutionContext, Future}

object SystemCreateGroup extends WorkflowBase {

  //... other definitions ...

  case class SystemAddGroupParameters(
    shortName: String,
    name: String,
    items: Vector[ObjectID],
    itemsType: ContainerType
  ) extends WorkflowParameters {
    override def asJson: JsValue = Json.obj(
      "shortName" -> shortName,
      "name" -> name,
      "items" -> items,
      "itemsType" -> itemsType
    )
  }

  override def parseParameters(rawParams: JsValue): Future[WorkflowParameters] = {
    Future {
      SystemAddGroupParameters(
        (rawParams \ "shortName").as[String],
        (rawParams \ "name").as[String],
        (rawParams \ "items").as[Vector[ObjectID]],
        (rawParams \ "itemsType").as[String]
      )
    }
  }

  //... other definitions ...
  
}

What if no parameters are needed?

Just return a core3.workflows.NoWorkflowParameters object:

import core3.database.containers.core
import core3.security.UserTokenBase
import core3.workflows._
import play.api.libs.json.{JsValue, Json}

import scala.concurrent.{ExecutionContext, Future}

object SystemQueryGroups extends WorkflowBase {

  //... other definitions ...

  override def parseParameters(rawParams: JsValue): Future[WorkflowParameters] = {
    Future.successful(NoWorkflowParameters())
  }

  //... other definitions ...

}

Defining input data objects

Just as the parameters, input data objects are meant to define all the containers needed by the workflow and the loadData(...) function performs all the required queries and transformations.

import core3.database.containers.core
import core3.database.{ObjectID, RevisionID, RevisionSequenceNumber}
import core3.security.UserTokenBase
import core3.workflows._
import play.api.libs.json.{JsValue, Json}

import scala.concurrent.{ExecutionContext, Future}

object SystemDeleteGroup extends WorkflowBase {

  //... other definitions ...

  case class SystemDeleteGroupInputData(group: core.Group) extends InputData {
    override def asJson: JsValue = Json.obj(
      "group" -> group
    )
  }

  override def loadData(params: WorkflowParameters, queryHandlers: DataQueryHandlers): Future[InputData] = {
    params match {
      case actualParams: SystemDeleteGroupParameters =>
        for {
          group <- queryHandlers.getContainerWithRevision(
            "Group",
            actualParams.groupID,
            actualParams.revision,
            actualParams.revisionNumber
          ).map(_.asInstanceOf[core.Group])
        } yield {
          SystemDeleteGroupInputData(group)
        }
    }
  }
  
  //... other definitions ...

}

Data query handlers

A DataQueryHandlers object is passed to each call to loadData(...) and it contains functions for retrieving data from the DB(s). This is the only way to access data from within a workflow.

/**
    * Container for workflow data query handlers.
    *
    * @param getGroup                 a function for retrieving a group by short name
    * @param getContainer             a function for retrieving a container by type & ID
    * @param getContainerWithRevision a function for retrieving a container by type, ID, revision and revision number
    * @param getContainers            a function for querying containers
    * @param getAllContainers         a function for retrieving all containers of a given type
    * @param loadView                 a function for loading views
    */
  case class DataQueryHandlers(
    getGroup: (String) => Future[core3.database.containers.core.Group],
    getContainer: (ContainerType, ObjectID) => Future[Container],
    getContainerWithRevision: (ContainerType, ObjectID, RevisionID, RevisionSequenceNumber) => Future[MutableContainer],
    getContainers: (ContainerType, String, Map[String, String]) => Future[ContainerSet],
    getAllContainers: (ContainerType) => Future[ContainerSet],
    loadView: (ContainerView) => Future[Unit]
  )

What if no input data is needed?

Just return a core3.workflows.NoInputData object:

import core3.database.containers.core
import core3.database.{ContainerType, ObjectID}
import core3.security.UserTokenBase
import core3.workflows._
import play.api.libs.json.{JsValue, Json}

import scala.concurrent.{ExecutionContext, Future}

object SystemCreateGroup extends WorkflowBase {

  //... other definitions ...

  override def loadData(params: WorkflowParameters, queryHandlers: DataQueryHandlers): Future[InputData] = {
    Future.successful(NoInputData())
  }

  //... other definitions ...
  
}

Clone this wiki locally