Putting it All Together

In this section we’ll be discussing the overall structure of the workflow runner we developed in spiff-example-cli.

Our example application contains two different workflow runners, one that uses tasks with with Spiff extensions (spiff-bpmn-runner.py) and one that uses the deprecated Camunda extensions (camunda-bpmn-runner.py).

The primary differences between the two are in handling User and MultiInstance Tasks. We have some documentation about how we interpret Camunda forms in Tasks. That particular page comes from an earlier version of our documentation, and camunda-bpmn-runner.py can run workflows with these tasks. However, we are not actively maintaining the camunda package, and it should be considered deprecated.

Base Application Runner

The core functions your application will have to accomodate are

  • parsing workflows
  • serializing workflows
  • running workflows

Task specs define how tasks are executed, and creating the task specs depends on a parser which initializes a spec of the appropriate class. And of course serialization is also heavily dependent on the same information needed to create the instance. To that end, our BPMN runner requires that you provide a parser and serializer; it can’t operate unless it knows what to do with each task spec it runs across.

Here is the initialization for the runner.SimpleBpmnRunner class that is used by both scripts.

def __init__(self, parser, serializer, script_engine=None, handlers=None):

    self.parser = parser
    self.serializer = serializer
    self.script_engine = script_engine
    self.handlers = handlers or {}
    self.workflow = None

If you read the introduction to BPMN, you’ll remember that there’s a Script Task; the script engine executes scripts against the task data and updates it. Gateway conditions are also evaluated against the same context by the engine.

SpiffWorkflow provides a default scripting environment that is suitable for simple applications, but a serious application will probably need to extend (or restrict) it in some way. See A More In-Depth Look at Some of SpiffWorkflow’s Features for a few examples. Therefore, we have the ability to optionally pass one in.

The handlers argument allows us to let our application know what to do with specific task spec types. It’s a mapping of task spec class to its handler. Most task specs won’t need handlers outside of how SpiffWorkflow executes them (that’s probably why you are using this library). You’ll only have to be concerned with the task spec types that require human interaction; Spiff will not handle those for you. In your application, these will probably be built into it and you won’t need to pass anything in.

However, here we’re trying to build something flexible enough that it can at least deal with two completely different mechanisms for handling User Tasks, and provide a means for you to experiment with this application.

Parsing Workflows

Here is the method we use to parse the workflows;

def parse(self, name, bpmn_files, dmn_files=None, collaboration=False):

    self.parser.add_bpmn_files(bpmn_files)
    if dmn_files:
        self.parser.add_dmn_files(dmn_files)

    if collaboration:
        top_level, subprocesses = self.parser.get_collaboration(name)
    else:
        top_level = self.parser.get_spec(name)
        subprocesses = self.parser.get_subprocess_specs(name)
    self.workflow = BpmnWorkflow(top_level, subprocesses, script_engine=self.script_engine)

We add the BPMN and DMN files to the parser and use parser.get_spec to create a workflow spec for a process model.

SpiffWorkflow needs at least one spec to create a workflow; this will be created from the name of the process passed into the method. It also needs specs for any subprocesses or call activities. The method parser.get_subprocess_specs will search recursively through a starting spec and collect specs for all referenced resources.

It is possible to have two processes defined in a single model, via a Collaboration. In this case, there is no “top level spec”. We can use self.parser.get_collaboration to handle this case.

Note

The only required argument to BpmnWorkflow is a single workflow spec, in this case top_level. The parser returns an empty dict if no subprocesses are present, but it is not required to pass this in. If there are subprocess present, subprocess_specs will be a mapping of process ID to BpmnWorkflowSpec.

In simple_bpmn_runner.py we create the parser like this:

from SpiffWorkflow.spiff.parser.process import SpiffBpmnParser, BpmnValidator
parser = SpiffBpmnParser(validator=BpmnValidator())

The validator is an optional argument, which can be used to validate the BPMN files passed in. The BpmnValidator in the spiff package is configured to validate against the BPMN 2.0 spec and our spec describing our own extensions.

The parser we imported is pre-configured to create task specs that know about Spiff extensions.

There are parsers in both the bpmn and camunda packages that can be similarly imported. There is a validator that uses only the BPMN 2.0 spec in the bpmn package (but no similar validator for Camunda).

It is possible to override particular task specs for specific BPMN Task types. We’ll cover an example of this in A More In-Depth Look at Some of SpiffWorkflow’s Features.

Serializing Workflows

In addition to the pre-configured parser, each package has a pre-configured serializer.

from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG
from runner.product_info import registry
wf_spec_converter = BpmnWorkflowSerializer.configure_workflow_spec_converter(SPIFF_SPEC_CONFIG)
serializer = BpmnWorkflowSerializer(wf_spec_converter, registry)

The serializer has two components:

  • the workflow_spec_converter, which knows about objects inside SpiffWorkflow
  • the registry, which can tell SpiffWorkflow how to handle arbitrary data from your scripting environment (required only if you have non-JSON-serializable data there).

We discuss the creation and use of registry in A More In-Depth Look at Some of SpiffWorkflow’s Features so we’ll ignore it for now.

SPIFF_SPEC_CONFIG has serialization methods for each of the task specs in its parser and we can create a converter from it directly and pass it into our serializer.

Here is our deserialization code:

def restore(self, filename):
    with open(filename) as fh:
        self.workflow = self.serializer.deserialize_json(fh.read())
        if self.script_engine is not None:
            self.workflow.script_engine = self.script_engine

We’ll just pass the contents of the file to the serializer and it will restore the workflow. The scripting environment was not serialized, so we have to make sure we reset it.

And here is our serialization code:

def dump(self):
    filename = input('Enter filename: ')
    with open(filename, 'w') as fh:
        dct = self.serializer.workflow_to_dict(self.workflow)
        dct[self.serializer.VERSION_KEY] = self.serializer.VERSION
        fh.write(json.dumps(dct, indent=2, separators=[', ', ': ']))

The serializer has a companion method serialize_json but we’re bypassing that here so that we can make the output readable.

The heart of the serialization process actually happens in workflow_to_dict. This method returns a dictionary representation of the workflow that contains only JSON-serializable items. All serialize_json does is add a serializer version and call json.dumps on the returned dict. If you are developing a serious application, it is unlikely you want to store the entire workflow as a string, so you should be aware that this method exists.

The serializer is fairly complex: not only does it need to handle SpiffWorkflow’s own internal objects that it knows about, it needs to handle arbitrary Python objects in the scripting environment. The serializer is covered in more depth in A More In-Depth Look at Some of SpiffWorkflow’s Features.

Defining Task Handlers

In spiff-bpmn-runner.py, we also define the functions complete_user_task. and complete_manual_task.

We went over these handlers in Tasks, so we won’t delve into them here.

We create a mapping of task type to handler, which we’ll pass to our workflow runner.

handlers = {
    UserTask: complete_user_task,
    ManualTask: complete_manual_task,
    NoneTask: complete_manual_task,
}

In SpiffWorkflow the NoneTask (which corresponds to the bpmn:task is treated as a human task, and therefore has no built in way of handling them. Here we treat them as if they were Manual Tasks.

Running Workflows

Our application’s run_workflow method takes one argument: step is a boolean that lets the runner know if if should stop and present the menu at every step (if True) or only where there are human tasks to complete.

def run_workflow(self, step=False):

    while not self.workflow.is_completed():

        if not step:
            self.advance()

        tasks = self.workflow.get_tasks(TaskState.READY|TaskState.WAITING)
        runnable = [t for t in tasks if t.state == TaskState.READY]
        human_tasks = [t for t in runnable if t.task_spec.manual]
        current_tasks = human_tasks if not step else runnable

        self.list_tasks(tasks, 'Ready and Waiting Tasks')
        if len(current_tasks) > 0:
            action = self.show_workflow_options(current_tasks)
        else:
            action = None
            if len(tasks) > 0:
                input("\nPress any key to update task list")

In the code above we first get the list of all READY or WAITING tasks; these are the currently active tasks. READY tasks can be run, and WAITING tasks may change to READY (see SpiffWorkflow Concepts for a discussion of task states). We aren’t going to do anything with the WAITING tasks except display them.

We can further filter our runnable tasks on the task_spec.manual attribute. If we’re stepping though the workflow, we’ll present the entire list; otherwise only the human tasks. There are actually many points where no human tasks are available to execute; the advance method runs the other runnable tasks if we’ve opted to skip displaying them; we’ll look at that method after this one.

There may also be points where there are no runnable tasks at all (for example, if the entire process is waiting on a timer). In that case, we’ll do nothing until the user indicates we can proceeed (the timer will fire regardless of what the user does – we’re just preventing this loop from executing repeatedly when there’s nothing to do).

if action == 'r':
    task = self.select_task(current_tasks)
    handler = self.handlers.get(type(task.task_spec))
    if handler is not None:
        handler(task)
    task.run()

In the code above, we present a menu of runnable tasks to the user and run the one they chose, optionally calling one of our handlers.

Each task has a data attribute, which can by optionally updated when the task is READY and before it is run. The task data is just a dictionary. Our handler modifies the task data if necessary (eg adding data collected from forms), and task.run propogates the data to any tasks following it, and changes its state to one of the FINISHED states; nothing more will be done with this task after this point.

We’ll skip over most of the options in run_workflow since they are pretty straightforward.

self.workflow.refresh_waiting_tasks()

At the end of each iteration, we call refresh_waiting_tasks to ensure that any currently WAITING tasks will move to READY if they are able to do so.

After the workflow finishes, we’ll give the user a few options for looking at the end state.

while action != 'q':
    action = self.show_prompt('\nSelect action: ', {
        'a': 'List all tasks',
        'v': 'View workflow data',
        'q': 'Quit',
    })
    if action == 'a':
    self.list_tasks([t for t in self.workflow.get_tasks() if t.task_spec.bpmn_id is not None], "All Tasks")
    elif action == 'v':
        dct = self.serializer.data_converter.convert(self.workflow.data)
        print('\n' + json.dumps(dct, indent=2, separators=[', ', ': ']))

Note that we’re filtering the task lists with t.task_spec.bpmn_id is not None. The workflow contains tasks other than the ones visible on the BPMN diagram; these are tasks that SpiffWorkflow uses to manage execution and we’ll omit them from the displays. If a task is visible on a diagram it will have a non-null value for its bpmn_id attribute (because all BPMN elements require IDs), otherwise the value will be None. See A More In-Depth Look at Some of SpiffWorkflow’s Features for more information about BPMN task spec attributes.

When a workflow completes, the task data from the “End” task, which has built up through the operation of the workflow, is copied into the workflow data, so we want to give the option to display this end state. We’re using the serializer’s data_converter to handle the workflow data (the registry) we passed in earlier, because it may contain arbitrary data.

Let’s take a brief look at the advance method:

def advance(self):
    engine_tasks = [t for t in self.workflow.get_tasks(TaskState.READY) if not t.task_spec.manual]
    while len(engine_tasks) > 0:
        for task in engine_tasks:
            task.run()
        self.workflow.refresh_waiting_tasks()
        engine_tasks = [t for t in self.workflow.get_tasks(TaskState.READY) if not t.task_spec.manual]

This method is really just a condensed version of run_workflow that ignore human tasks and doesn’t need to present a menu. We use it to get to a point in our workflow where there are only human tasks left to run.

In general, an application that uses SpiffWorkflow will use these methods as a template. It will consist of a loop that:

  • runs any READY engine tasks (where task_spec.manual == False)
  • presents READY human tasks to users (if any)
  • updates the human task data if necessary
  • runs the human tasks
  • refreshes any WAITING tasks

until there are no tasks left to complete.

The rest of the code is all about presenting the tasks to the user and dumping the workflow state. These are the parts that you’ll want to customize in your own application.