Instantiating a Workflow
From the start_workflow
method of our BPMN engine (engine/engine.py):
def start_workflow(self, spec_id):
spec, sp_specs = self.serializer.get_workflow_spec(spec_id)
wf = BpmnWorkflow(spec, sp_specs, script_engine=self._script_engine)
wf_id = self.serializer.create_workflow(wf, spec_id)
return Instance(wf_id, workflow)
We’ll use our serializer to recreate the workflow spec based on the id. As discussed in Handling Subprocesses and Call Activities, a process has a top level specification and dictionary of process id -> spec containing any other processes referenced by the top level process (Call Actitivies and Subprocesses).
Running a Workflow
In the simplest case, running a workflow involves implementing the following loop:
runs any READY engine tasks (where
task_spec.manual == False
)presents READY human tasks to users (if any)
updates the human task data if necessary
runs the human tasks
refreshes any WAITING tasks
until there are no tasks left to complete.
We’ll refer to code from engine/instance.py in the next few sections.
Here are our engine methods:
def run_until_user_input_required(self, workflow):
task = workflow.get_next_task(state=TaskState.READY, manual=False)
while task is not None:
task.run()
self.run_ready_events(workflow)
task = workflow.get_next_task(state=TaskState.READY, manual=False)
def run_ready_events(self, workflow):
workflow.refresh_waiting_tasks()
task = workflow.get_next_task(state=TaskState.READY, spec_class=CatchingEvent)
while task is not None:
task.run()
task = workflow.get_next_task(state=TaskState.READY, spec_class=CatchingEvent)
In the first, we retrieve and run any tasks that can be executed automatically, including processing any events that might have occurred.
The second method handles processing events. A task that corresponds to an event remains in state WAITING
until
it catches whatever event it is waiting on, at which point it becomes READY
and can be run. The
workflow.refresh_waiting_tasks
method iterates over all the waiting tasks and changes the state to READY
if the conditions for doing so have been met.
We’ll cover using the workflow.get_next_task
method and handling Human tasks later in this document.
Note
The Instance
class also has a task filter attribute and a list of filtered tasks, which are used
by the UI, so we update that in these methods as weill.
Tasks
In this section, we’ll give an overview of some of the general attributes of Task Specs and then delve into a few specific types. See Specifications vs. Instances to read about Tasks vs Task Specs.
BPMN Task Specs
BPMN Task Specs inherit quite a few attributes from SpiffWorkflow.specs.base.TaskSpec
, but you probably
don’t have to pay much attention to most of them. A few of the important ones are:
name: the unique id of the TaskSpec, and it will correspond to the BPMN ID if that is present
description: we use this attribute to provide a description of the BPMN task type
manual:
True
if human input is required to complete tasks associated with this Task Spec
The manual
attribute is particularly important, because SpiffWorkflow does not include built-in
handling of these tasks so you’ll need to implement this as part of your application. We’ll go over how this is
handled in this application in the next section.
Note
NoneTasks (BPMN tasks with no more specific type assigned) are treated as Manual Tasks by SpiffWorkflow.
BPMN Task Specs have the following additional attributes.
bpmn_id: the ID of the BPMN Task (this will be
None
if the task is not visible on the diagram)bpmn_name: the BPMN name of the Task
lane: the lane of the BPMN Task
documentation: the contents of the BPMN documentation element for the Task
In the example application, we use these bpmn_name
(or name
when a bpmn_name
isn’t specified),
and lane
to display information about the tasks in a workflow:
def get_task_display_info(self, task):
return {
'depth': task.depth,
'state': TaskState.get_name(task.state),
'name': task.task_spec.bpmn_name or task.task_spec.name,
'lane': task.task_spec.lane,
}
Instantiated Tasks
Actually all Tasks are instantiated – that is what distinguishes a Task from a Task Spec; however, it is impossible to belabor this point too much.
Tasks have a few additional attributes that contain important details about particular instances:
id
: a UUID that uniquely identifies the Task (remember that a Task Spec may be reached more than once, but a new Task is created each time)task_spec
: the Task Spec associated with this Taskstate
: the state of the Task, represented as one of the values inTaskState
last_state_change
: the timestamp of the last time this Task changed statedata
: a dictionary that holds task/workflow data
Human (User and Manual) Tasks
Remember that the bpmn
module does not provide any default capability for gathering information from a user,
and this is something you’ll have to implement. In this example, we’ll assume that we are using Task Specs from the
spiff
module (there is an alternative implementation in the camunda
module).
Spiff Arena uses JSON schemas to define forms associated with User Tasks and
react-jsonschema-form to render them. Additionally, our User
and Manual tasks have a custom extension instructionsForEndUser
which stores a Jinja template with Markdown
formatting that is rendered using the task data. A different format for defing forms could be used and Jinja and
Markdown could be easily replaced by other templating and rendering schemes depending on your application’s needs.
Our User and Manual Task handlers render the instructions (this code is from spiff/curses_handlers.py):
from jinja2 import Template
def set_instructions(self, task):
user_input = self.ui._states['user_input']
user_input.instructions = f'{self.task.task_spec.bpmn_name}\n\n'
text = self.task.task_spec.extensions.get('instructionsForEndUser')
if text is not None:
template = Template(text)
user_input.instructions += template.render(self.task.data)
user_input.instructions += '\n\n'
We’re not going to attempt to handle Markdown in a curses UI, so we’ll assume we just have text. However, we do
want to be able to incorporate data specific to the workflow in information that is presented to a user; this is
something that your application will certainly need to do. Here, we use the data
attribute of the Task
(recall that this is a dictionary) to render the template.
Our application contains a Field
class (defined in curses_ui/user_input.py) that tells us
how to convert to and from a string representation that can be displayed on the screen and can interact with the form
display screen. Our User Task handler also has a method for translating a couple of basic JSON schema types into
something that can be displayed (supporting only text, integers, and ‘oneOf’). The form screen collects and validates
the user input and collects the results in a dictionary.
We won’t go into the details about how the form screen works, as it’s specific to this application rather than the library itself; instead we’ll skip to the code that runs the task after it has been presented to the user; any application needs to do this.
When our form is submitted, we ask our Instance
to update the task data (if applicable, as in the case of a
form) and run the task.
def run_task(self, task, data=None):
if data is not None:
task.set_data(**data)
task.run()
if not self.step:
self.run_until_user_input_required()
else:
self.update_task_filter()
Here we are setting a key for each field in the form. Other possible options here are to set one key that contains all of the form data, or map the schema to Python class and use that in lieu of a dictionary. It’s up to you to decide the best way of managing this.
The key points here are that your application will need to have the capability to display information, potentially incorporating data from the workflow instance, as well as update this data based on user input. We’ll go through a simple example next.
We’ll refer to the process modeled in task_types.bpmn contains a simple form which asks a user to input a product and quantity as well a manual task presenting the order information at the end of the process (the form is defined in select_product_and_quantity.json)
After the user submits the form, we’ll collect the results in the following dictionary:
{
'product_name': 'product_a',
'product_quantity': 2,
}
We’ll add these variables to the task data before we run the task. The Business Rule task looks up the price from a
DMN table based on product_name
and the Script Task sets order_total
based on the price and quantity.
Our Manual Task’s instructions look like this:
Order Summary
{{ product_name }}
Quantity: {{ product_quantity }}
Order Total: {{ order_total }}
and when rendered against the instance data, reflects the details of this particular order.
Business Rule Tasks
Business Rule Tasks are not implemented in the SpiffWorkflow.bpmn
module; however, the library does contain
a DMN implementation of a Business Rule Task in the SpiffWorkflow.dmn
module. Both the spiff
and
camunda
modules include DMN support.
Gateways
You will not need special code to handle gateways (this is one of the things this library does for you), but it is worth emphasizing that gateway conditions are treated as Python expressions which are evaluated against the context of the task data. See Script Engine Overview for more details.
Script and Service Tasks
See Script Engine Overview for more information about how Spiff handles these tasks. There is no default Service Task
implementation, but we’ll go over an example of one way this might be implemented there. Script tasks assume the
script
attribute contains the text of a Python script, which is executed in the context of the task’s data.
Filtering Tasks
SpiffWorkflow has two methods for retrieving tasks:
workflow.get_tasks
: returns an iterator over matching tasks, or an empty listworkflow.get_next_task
: returns the first matching task, or None
Both of these methods use the same helper classes and take the same arguments – the only difference is the return type.
These methods create a TaskIterator
. The an optional first argument of a task to begin the iteration from (if it is
not provided, iteration begins are the root). This is useful if you know you want to continue executing a workflow from a
particular place. The remainder of the arguments are keyword arguments that are passed directly into a TaskFilter
,
which will determine which tasks match.
Tasks can be filtered by:
state
: aTaskState
value (see Understanding Task States for the possible states)spec_name
: the name of a Task Spec (this will typically correspond to the BPMN ID)manual
: whether the Task Spec requires manual inputupdated_ts
: limits results to after the provided timestampspec_class
: limits results to a particular Task Spec classlane
: the lane of the Task Speccatches_event
: Task Specs that catch a particularBpmnEvent
Examples
We reference the following processes here:
To filter by state, We need to import the TaskState
object (unless you want to memorize which numbers
correspond to which states).
from SpiffWorkflow.util.task import TaskState
We can use this object to translate an integer to a human-readable name using TaskState.get_name(task.state)
;
there is also a corresponding TaskState.get_value
method that goes from name to integer.
Ready Human Tasks
tasks = workflow.get_tasks(state=TaskState.READY, manual=False)
Completed Tasks
tasks = workflow.get_tasks(state=TaskState.COMPLETED)
Tasks by Spec Name
tasks = workflow.get_tasks(spec_name='customize_product')
will return a list containing the Call Activities for the customization of a product in our example workflow.
Tasks Updated After
ts = datetime.now() - timedelta(hours=1)
tasks = workflow.get_tasks(state=TaskState.WAITING, updated_ts=ts)
Returns Tasks that changed to WAITING
in the past hour.
Tasks by Lane
ready_tasks = workflow.get_tasks(state=TaskState.READY, lane='Customer')
will return only Tasks in the ‘Customer’ lane in our example workflow.
Subprocesses and Call Activities
In the first section of this document, we noted that BpmnWorkflow
is instantiated with a top level spec as
well as a collection of specs for any referenced processes. The instantiated BpmnSubWorkflows
are maintained
as mapping of task.id
to BpmnSubworkflow
in the subprocesses
attribute.
Both classes inherit from Workflow
and maintain tasks in separate task trees. However, only
BpmnWorkflow
maintains subworkflow information; even deeply nested workflows are stored at the top level (for
ease of access).
Task iteration also works differently as well. BpmnWorkflow.get_tasks
has been extended to retrieve
subworkflows associated with tasks and iterate over those as well; when iterating over tasks in a
BpmnSubWorkflow
, only tasks from that workflow will be returned.
task = workflow.get_next_task(spec_name='customize_product')
subprocess = workflow.get_subprocess(task)
subprocess_tasks = subprocess.get_tasks()
This code block finds the first product customization of our example workflow and gets only the tasks inside that workflow.
A BpmnSubworkflow
always uses the top level workflow’s script engine, to ensure consistency.
Additionally, the class has a few extra attributes to make it more convenient to navigate across nested workflows:
subworkflow.top_workflow
returns the top level workflowsubworkflow.parent_task_id
returns the UUID of the task the workflow is associated withparent_workflow
: returns the workflow immediately above it in the stack
These methods exist on the top level workflow as well, and return None
.
Events
BPMN Events are represented by BpmnEvent
class. An instance of this class contains an EventDefinition
,
an optional payload, message correlations for Messages that define them, and (also optionally) a target subworkflow.
The last property is used internally by SpiffWorkflow by subworkflows that need to communicate with other subworkflows
and can be safely ignored.
The relationship between the EventDefinition
and BpmnEvent
is analagous to that of TaskSpec
and Task
: a TaskSpec
defining a BPMN Event has an additional event_definition
attribute that
contains the information about the Event that will be caught or thrown.
When an event is thrown, a BpmnEvent
will be created using the EventDefinition
associated with the
task’s spec, and payload, if applicable. For events with payloads, the EventDefinition
will define how to
create the payload based on the workflow instance and include this with the event. A Timer Event will know how to
parse and evaluate the provided expression. And so forth.
The event will be passed to the workflow.catch
method, which will iterate over the all the tasks and pass the
event to any tasks that are waiting for that event. If no tasks that catch the event are present in the workflow, the
event will placed in a pending event queue and these events can be retrieved with the workflow.get_events
method.
Note
This method clears the event queue, so if your application retrieves the event and does not handle it, it is gone forever!
The application in this repo is designed to run single workflows, so it does not have any external event handling. If you implement such functionality, you’ll need a way of identifying which processes any retrieved events should be sent to.
The workflow.waiting_events
will return a list of PendingBpmnEvents
, which contain the name and type
of event and might be used to help determine this.
Once you have determined which workflow should receive the event, you can pass it to workflow.catch
to handle
it.
In Script Engine Overview, there is an example of how to create an event and pass it back to a workflow when executing
a Service Task; this shows how you might construct a BpmnEvent
to pass to workflow.catch
.