Instantiating a Workflow

From the start_workflow method of our BPMN engine (engine/engine.py):

def start_workflow(self, spec_id):
    spec, sp_specs = self.serializer.get_workflow_spec(spec_id)
    wf = BpmnWorkflow(spec, sp_specs, script_engine=self._script_engine)
    wf_id = self.serializer.create_workflow(wf, spec_id)
    return Instance(wf_id, workflow)

We’ll use our serializer to recreate the workflow spec based on the id. As discussed in Handling Subprocesses and Call Activities, a process has a top level specification and dictionary of process id -> spec containing any other processes referenced by the top level process (Call Actitivies and Subprocesses).

Running a Workflow

In the simplest case, running a workflow involves implementing the following loop:

  • runs any READY engine tasks (where task_spec.manual == False)

  • presents READY human tasks to users (if any)

  • updates the human task data if necessary

  • runs the human tasks

  • refreshes any WAITING tasks

until there are no tasks left to complete.

We’ll refer to code from engine/instance.py in the next few sections.

Here are our engine methods:

def run_until_user_input_required(self, workflow):
    task = workflow.get_next_task(state=TaskState.READY, manual=False)
    while task is not None:
        task.run()
        self.run_ready_events(workflow)
        task = workflow.get_next_task(state=TaskState.READY, manual=False)

def run_ready_events(self, workflow):
    workflow.refresh_waiting_tasks()
    task = workflow.get_next_task(state=TaskState.READY, spec_class=CatchingEvent)
    while task is not None:
        task.run()
        task = workflow.get_next_task(state=TaskState.READY, spec_class=CatchingEvent)

In the first, we retrieve and run any tasks that can be executed automatically, including processing any events that might have occurred.

The second method handles processing events. A task that corresponds to an event remains in state WAITING until it catches whatever event it is waiting on, at which point it becomes READY and can be run. The workflow.refresh_waiting_tasks method iterates over all the waiting tasks and changes the state to READY if the conditions for doing so have been met.

We’ll cover using the workflow.get_next_task method and handling Human tasks later in this document.

Note

The Instance class also has a task filter attribute and a list of filtered tasks, which are used by the UI, so we update that in these methods as weill.

Tasks

In this section, we’ll give an overview of some of the general attributes of Task Specs and then delve into a few specific types. See Specifications vs. Instances to read about Tasks vs Task Specs.

BPMN Task Specs

BPMN Task Specs inherit quite a few attributes from SpiffWorkflow.specs.base.TaskSpec, but you probably don’t have to pay much attention to most of them. A few of the important ones are:

  • name: the unique id of the TaskSpec, and it will correspond to the BPMN ID if that is present

  • description: we use this attribute to provide a description of the BPMN task type

  • manual: True if human input is required to complete tasks associated with this Task Spec

The manual attribute is particularly important, because SpiffWorkflow does not include built-in handling of these tasks so you’ll need to implement this as part of your application. We’ll go over how this is handled in this application in the next section.

Note

NoneTasks (BPMN tasks with no more specific type assigned) are treated as Manual Tasks by SpiffWorkflow.

BPMN Task Specs have the following additional attributes.

  • bpmn_id: the ID of the BPMN Task (this will be None if the task is not visible on the diagram)

  • bpmn_name: the BPMN name of the Task

  • lane: the lane of the BPMN Task

  • documentation: the contents of the BPMN documentation element for the Task

In the example application, we use these bpmn_name (or name when a bpmn_name isn’t specified), and lane to display information about the tasks in a workflow:

def get_task_display_info(self, task):
    return {
        'depth': task.depth,
        'state': TaskState.get_name(task.state),
        'name': task.task_spec.bpmn_name or task.task_spec.name,
        'lane': task.task_spec.lane,
    }

Instantiated Tasks

Actually all Tasks are instantiated – that is what distinguishes a Task from a Task Spec; however, it is impossible to belabor this point too much.

Tasks have a few additional attributes that contain important details about particular instances:

  • id: a UUID that uniquely identifies the Task (remember that a Task Spec may be reached more than once, but a new Task is created each time)

  • task_spec: the Task Spec associated with this Task

  • state: the state of the Task, represented as one of the values in TaskState

  • last_state_change: the timestamp of the last time this Task changed state

  • data: a dictionary that holds task/workflow data

Human (User and Manual) Tasks

Remember that the bpmn module does not provide any default capability for gathering information from a user, and this is something you’ll have to implement. In this example, we’ll assume that we are using Task Specs from the spiff module (there is an alternative implementation in the camunda module).

Spiff Arena uses JSON schemas to define forms associated with User Tasks and react-jsonschema-form to render them. Additionally, our User and Manual tasks have a custom extension instructionsForEndUser which stores a Jinja template with Markdown formatting that is rendered using the task data. A different format for defing forms could be used and Jinja and Markdown could be easily replaced by other templating and rendering schemes depending on your application’s needs.

Our User and Manual Task handlers render the instructions (this code is from spiff/curses_handlers.py):

from jinja2 import Template

def set_instructions(self, task):
    user_input = self.ui._states['user_input']
    user_input.instructions = f'{self.task.task_spec.bpmn_name}\n\n'
    text = self.task.task_spec.extensions.get('instructionsForEndUser')
    if text is not None:
        template = Template(text)
        user_input.instructions += template.render(self.task.data)
    user_input.instructions += '\n\n'

We’re not going to attempt to handle Markdown in a curses UI, so we’ll assume we just have text. However, we do want to be able to incorporate data specific to the workflow in information that is presented to a user; this is something that your application will certainly need to do. Here, we use the data attribute of the Task (recall that this is a dictionary) to render the template.

Our application contains a Field class (defined in curses_ui/user_input.py) that tells us how to convert to and from a string representation that can be displayed on the screen and can interact with the form display screen. Our User Task handler also has a method for translating a couple of basic JSON schema types into something that can be displayed (supporting only text, integers, and ‘oneOf’). The form screen collects and validates the user input and collects the results in a dictionary.

We won’t go into the details about how the form screen works, as it’s specific to this application rather than the library itself; instead we’ll skip to the code that runs the task after it has been presented to the user; any application needs to do this.

When our form is submitted, we ask our Instance to update the task data (if applicable, as in the case of a form) and run the task.

def run_task(self, task, data=None):
    if data is not None:
        task.set_data(**data)
    task.run()
    if not self.step:
        self.run_until_user_input_required()
    else:
        self.update_task_filter()

Here we are setting a key for each field in the form. Other possible options here are to set one key that contains all of the form data, or map the schema to Python class and use that in lieu of a dictionary. It’s up to you to decide the best way of managing this.

The key points here are that your application will need to have the capability to display information, potentially incorporating data from the workflow instance, as well as update this data based on user input. We’ll go through a simple example next.

We’ll refer to the process modeled in task_types.bpmn contains a simple form which asks a user to input a product and quantity as well a manual task presenting the order information at the end of the process (the form is defined in select_product_and_quantity.json)

After the user submits the form, we’ll collect the results in the following dictionary:

{
    'product_name': 'product_a',
    'product_quantity': 2,
}

We’ll add these variables to the task data before we run the task. The Business Rule task looks up the price from a DMN table based on product_name and the Script Task sets order_total based on the price and quantity.

Our Manual Task’s instructions look like this:

Order Summary
{{ product_name }}
Quantity: {{ product_quantity }}
Order Total: {{ order_total }}

and when rendered against the instance data, reflects the details of this particular order.

Business Rule Tasks

Business Rule Tasks are not implemented in the SpiffWorkflow.bpmn module; however, the library does contain a DMN implementation of a Business Rule Task in the SpiffWorkflow.dmn module. Both the spiff and camunda modules include DMN support.

Gateways

You will not need special code to handle gateways (this is one of the things this library does for you), but it is worth emphasizing that gateway conditions are treated as Python expressions which are evaluated against the context of the task data. See Script Engine Overview for more details.

Script and Service Tasks

See Script Engine Overview for more information about how Spiff handles these tasks. There is no default Service Task implementation, but we’ll go over an example of one way this might be implemented there. Script tasks assume the script attribute contains the text of a Python script, which is executed in the context of the task’s data.

Filtering Tasks

SpiffWorkflow has two methods for retrieving tasks:

  • workflow.get_tasks: returns an iterator over matching tasks, or an empty list

  • workflow.get_next_task: returns the first matching task, or None

Both of these methods use the same helper classes and take the same arguments – the only difference is the return type.

These methods create a TaskIterator. The an optional first argument of a task to begin the iteration from (if it is not provided, iteration begins are the root). This is useful if you know you want to continue executing a workflow from a particular place. The remainder of the arguments are keyword arguments that are passed directly into a TaskFilter, which will determine which tasks match.

Tasks can be filtered by:

  • state: a TaskState value (see Understanding Task States for the possible states)

  • spec_name: the name of a Task Spec (this will typically correspond to the BPMN ID)

  • manual: whether the Task Spec requires manual input

  • updated_ts: limits results to after the provided timestamp

  • spec_class: limits results to a particular Task Spec class

  • lane: the lane of the Task Spec

  • catches_event: Task Specs that catch a particular BpmnEvent

Examples

We reference the following processes here:

To filter by state, We need to import the TaskState object (unless you want to memorize which numbers correspond to which states).

from SpiffWorkflow.util.task import TaskState

We can use this object to translate an integer to a human-readable name using TaskState.get_name(task.state); there is also a corresponding TaskState.get_value method that goes from name to integer.

Ready Human Tasks

tasks = workflow.get_tasks(state=TaskState.READY, manual=False)

Completed Tasks

tasks = workflow.get_tasks(state=TaskState.COMPLETED)

Tasks by Spec Name

tasks = workflow.get_tasks(spec_name='customize_product')

will return a list containing the Call Activities for the customization of a product in our example workflow.

Tasks Updated After

ts = datetime.now() - timedelta(hours=1)
tasks = workflow.get_tasks(state=TaskState.WAITING, updated_ts=ts)

Returns Tasks that changed to WAITING in the past hour.

Tasks by Lane

ready_tasks = workflow.get_tasks(state=TaskState.READY, lane='Customer')

will return only Tasks in the ‘Customer’ lane in our example workflow.

Subprocesses and Call Activities

In the first section of this document, we noted that BpmnWorkflow is instantiated with a top level spec as well as a collection of specs for any referenced processes. The instantiated BpmnSubWorkflows are maintained as mapping of task.id to BpmnSubworkflow in the subprocesses attribute.

Both classes inherit from Workflow and maintain tasks in separate task trees. However, only BpmnWorkflow maintains subworkflow information; even deeply nested workflows are stored at the top level (for ease of access).

Task iteration also works differently as well. BpmnWorkflow.get_tasks has been extended to retrieve subworkflows associated with tasks and iterate over those as well; when iterating over tasks in a BpmnSubWorkflow, only tasks from that workflow will be returned.

task = workflow.get_next_task(spec_name='customize_product')
subprocess = workflow.get_subprocess(task)
subprocess_tasks = subprocess.get_tasks()

This code block finds the first product customization of our example workflow and gets only the tasks inside that workflow.

A BpmnSubworkflow always uses the top level workflow’s script engine, to ensure consistency.

Additionally, the class has a few extra attributes to make it more convenient to navigate across nested workflows:

  • subworkflow.top_workflow returns the top level workflow

  • subworkflow.parent_task_id returns the UUID of the task the workflow is associated with

  • parent_workflow: returns the workflow immediately above it in the stack

These methods exist on the top level workflow as well, and return None.

Events

BPMN Events are represented by BpmnEvent class. An instance of this class contains an EventDefinition, an optional payload, message correlations for Messages that define them, and (also optionally) a target subworkflow. The last property is used internally by SpiffWorkflow by subworkflows that need to communicate with other subworkflows and can be safely ignored.

The relationship between the EventDefinition and BpmnEvent is analagous to that of TaskSpec and Task: a TaskSpec defining a BPMN Event has an additional event_definition attribute that contains the information about the Event that will be caught or thrown.

When an event is thrown, a BpmnEvent will be created using the EventDefinition associated with the task’s spec, and payload, if applicable. For events with payloads, the EventDefinition will define how to create the payload based on the workflow instance and include this with the event. A Timer Event will know how to parse and evaluate the provided expression. And so forth.

The event will be passed to the workflow.catch method, which will iterate over the all the tasks and pass the event to any tasks that are waiting for that event. If no tasks that catch the event are present in the workflow, the event will placed in a pending event queue and these events can be retrieved with the workflow.get_events method.

Note

This method clears the event queue, so if your application retrieves the event and does not handle it, it is gone forever!

The application in this repo is designed to run single workflows, so it does not have any external event handling. If you implement such functionality, you’ll need a way of identifying which processes any retrieved events should be sent to.

The workflow.waiting_events will return a list of PendingBpmnEvents, which contain the name and type of event and might be used to help determine this.

Once you have determined which workflow should receive the event, you can pass it to workflow.catch to handle it.

In Script Engine Overview, there is an example of how to create an event and pass it back to a workflow when executing a Service Task; this shows how you might construct a BpmnEvent to pass to workflow.catch.