Putting it All Together

In this section we’ll be discussing the overall structure of the workflow runner we developed in spiff-example-cli.

Our example application contains two different workflow runners, one that uses tasks with Camunda extensions (run.py) and one that uses tasks with Spiff extensions (run-spiff.py).

Most of the workflow operations will not change, so shared functions are defined in utils.py.

The primary difference is handling user tasks. Spiff User Tasks define an extensions property that stores a filename containing a JSON schema used to define a web form. We use react-jsonschema-form to define our forms. This doesn’t necessarily make a lot of sense in terms of a command line UI, so we’ll focus on the Camunda workflow runner in this document.

Loading a Workflow

The CamundaParser extends the base BpmnParser, adding functionality for parsing forms defined in Camunda User Tasks and decision tables defined in Camunda Business Rule Tasks. (There is a similar SpiffBpmnParser used by the alternate runner.)

We create the parser and use it to load our workflow.

parser = CamundaParser()
wf = parse_workflow(parser, args.process, args.bpmn, args.dmn)

Our workflow parser looks like this;

def parse_workflow(parser, process, bpmn_files, dmn_files, load_all=True):
    parser.add_bpmn_files(bpmn_files)
    if dmn_files:
        parser.add_dmn_files(dmn_files)
    top_level = parser.get_spec(process)
    if load_all:
        subprocesses = parser.find_all_specs()
    else:
        subprocesses = parser.get_subprocess_specs(process)
    return BpmnWorkflow(top_level, subprocesses, script_engine=CustomScriptEngine)

We’ll obtain the workflow specification from the parser for the top level process using parser.get_spec().

We have two options for finding subprocess specs. The method parser.find_all_specs() will create specs for all executable processes found in every file supplied. The method parser.get_subprocess_specs(process) will create specs only for processes used by the specified process. Both search recursively for subprocesses; the only difference is the latter method limits the search start to the specified process.

Our examples are pretty simple and we’re not loading any extraneous stuff, so we’ll just always load everything. If your entire workflow is contained in your top-level process, you can omit the subprocess argument, but if your workflow contains call activities, you’ll need to use one of these methods to find the models for any called processes.

We also provide an enhanced script engine to our workflow. More information about how and why you might want to do this is covered in A More In-Depth Look at Some of SpiffWorkflow’s Features. The script_engine argument is optional and the default will be used if none is supplied.

We return BpmnWorkflow that runs our top-level workflow and contains specs for any subprocesses defined by that workflow.

Defining Task Handlers

In run.py, we define the function complete_user_task. This has code specific to Camunda User Task specs (in run-spiff.py, we do something different).

We also import the shared function complete_manual_task for handling Manual Tasks as there is no difference.

We create a mapping of task type to handler, which we’ll pass to our workflow runner.

handlers = {
    ManualTask: complete_manual_task,
    UserTask: complete_user_task,
}

This might not be a step you would need to do in an application you build, since you would likely have only one set of task specs that need to be parsed, handled, and serialized; however our run method is an awful lot of code to maintain in two separate files.

Running a Workflow

This is our application’s run method.

We pass our workflow, the task handlers, a serializer (creating a serializer is covered in more depth in A More In-Depth Look at Some of SpiffWorkflow’s Features).

The step argument is a boolean that indicates whether we want the option of seeing a more detailed representation of the state at each step, which we’ll discuss in the section following this one. The display_types argument controls what types of tasks should be included in a detailed list when stepping through a process.

def run(workflow, task_handlers, serializer, step, display_types):

    workflow.do_engine_steps()

    while not workflow.is_completed():

        ready_tasks = workflow.get_ready_user_tasks()
        options = { }
        print()
        for idx, task in enumerate(ready_tasks):
            option = format_task(task, False)
            options[str(idx + 1)] = task
            print(f'{idx + 1}. {option}')

        selected = None
        while selected not in options and selected not in ['', 'D', 'd']:
            selected = input('Select task to complete, enter to wait, or D to dump the workflow state: ')

        if selected.lower() == 'd':
            filename = input('Enter filename: ')
            state = BpmnSerializer().serialize_workflow(workflow, include_spec=True)
            with open(filename, 'w') as dump:
                dump.write(state)
        elif selected != '':
            next_task = options[selected]
            handler = task_handlers.get(type(next_task.task_spec))
            if handler is not None:
                handler(next_task)
            next_task.complete()

        workflow.refresh_waiting_tasks()
        workflow.do_engine_steps()
        if step:
            print_state(workflow, next_task, display_types)

    print('\nWorkflow Data')
    print(json.dumps(workflow.data, indent=2, separators=[ ', ', ': ' ]))

The first line of this function is the one that does the bulk of the work in SpiffWorkflow. Calling workflow.do_engine_steps() causes Spiff to repeatedly look for and execute any engine tasks that are ready.

An engine task does not require user interaction. For instance, it could be a Script task or selection of a flow from a gateway. Execution will stop when only interactive tasks remain or the workflow is completed.

A SpiffWorkflow application will call workflow.do_engine_steps() to start the workflow and then enter a loop that will

  • check for ready user tasks
  • present the tasks to the user to complete
  • complete the tasks
  • refresh any waiting tasks
  • complete any engine tasks that have been reached via user interactions

until the workflow completes.

When a workflow completes, the task data (just a dictionary passed from one task to the next, and optionally modified by each task) is copied into the workflow data. We display the end state of the workflow on completion.

The rest of the code is all about presenting the tasks to the user and dumping the workflow state. We’ve covered former in the BPMN Elements section of BPMN Workflows and will cover the latter in A More In-Depth Look at Some of SpiffWorkflow’s Features.

Handling task presentation is what you will be developing when you use SpiffWorkflow.

Examining the Workflow State

When this application is run and we want to present steps to the user, we’ll need to be able to examine the workflow and task states and associated data. We’ll cover the basics of this in this section.

The code below is a simple method for displaying information about a task. We use this in two ways

  • presenting a list of tasks to a user (in this case the state will always be ready, so we won’t include it)
  • presenting the state of each task while stepping through the workflow (in this case you most likely do want to know the state).
def format_task(task, include_state=True):
    if hasattr(task.task_spec, 'lane') and task.task_spec.lane is not None:
        lane = f'[{task.task_spec.lane}]'
    else:
        lane = ''
    state = f'[{task.get_state_name()}]' if include_state else ''
    return f'{lane} {task.task_spec.description} ({task.task_spec.name}) {state}'

We previously went over obtaining the lane information in Organizing More Complex Workflows.

We can call task.get_state_name() to get a human-readable representation of a task’s state.

We store the value provided in the name attribute of the task (the text entered in the ‘Name’ field in our sample models) in task.task_spec.description.

Here is the code we use for examining the workflow state.

def print_state(workflow, task, display_types):

    print('\nLast Task')
    print(format_task(task))
    print(json.dumps(task.data, indent=2, separators=[ ', ', ': ' ]))

    all_tasks = [ task for task in workflow.get_tasks() if isinstance(task.task_spec, display_types) ]
    upcoming_tasks = [ task for task in all_tasks if task.state in [TaskState.READY, TaskState.WAITING] ]

    print('\nUpcoming Tasks')
    for idx, task in enumerate(upcoming_tasks):
        print(format_task(task))

    if input('\nShow all tasks? ').lower() == 'y':
        for idx, task in enumerate(all_tasks):
            print(format_task(task))

We’ll print information about our task as described above, as well as a dump of its data.

We can get a list of all tasks regardless of type or state with workflow.get_tasks().

The actual list of tasks will get quite long (some tasks are expanded internally by Spiff into multiple tasks, and all gateways and events are also treated as “tasks”). So we’re filtering the tasks to only display the ones that would have salience to a user here.

We’ll further filter those tasks for READY and WAITING tasks for a more compact display, and only show all tasks when explicitly called for.