A More In-Depth Look at Some of SpiffWorkflow’s Features

Displaying Workflow State

Filtering Tasks

In our earlier example, all we did was check the lane a task was in and display it along with the task name and state.

Lets take a look at a sample workflow with lanes:

../_images/lanes.png

Workflow with lanes

To get all of the tasks that are ready for the ‘Customer’ workflow, we could specify the lane when retrieving ready user tasks:

ready_tasks = workflow.get_ready_user_tasks(lane='Customer')

If there were no tasks ready for the ‘Customer’ lane, you would get an empty list, and of course if you had no lane that was labeled ‘Customer’ you would always get an empty list.

We can also get a list of tasks by state.

We need to import the Task object (unless you want to memorize which numbers correspond to which states).

from SpiffWorkflow.task import Task

To get a list of completed tasks

tasks = workflow.get_tasks(Task.COMPLETED)

The tasks themselves are not particularly intuitive to work with. So SpiffWorkflow provides some facilities for obtaining a more user-friendly version of upcoming tasks.

MultiInstance Notes

loopCardinality - This variable can be a text representation of a number - for example ‘2’ or it can be the name of a variable in task.data that resolves to a text representation of a number. It can also be a collection such as a list or a dictionary. In the case that it is a list, the loop cardinality is equal to the length of the list and in the case of a dictionary, it is equal to the list of the keys of the dictionary.

If loopCardinality is left blank and the Collection is defined, or if loopCardinality and Collection are the same collection, then the MultiInstance will loop over the collection and update each element of that collection with the new information. In this case, it is assumed that the incoming collection is a dictionary, currently behavior for working with a list in this manner is not defined and will raise an error.

Collection This is the name of the collection that is created from the data generated when the task is run. Examples of this would be form data that is generated from a UserTask or data that is generated from a script that is run. Currently the collection is built up to be a dictionary with a numeric key that corresponds to the place in the loopCardinality. For example, if we set the loopCardinality to be a list such as [‘a’,’b’,’c] the resulting collection would be {1:’result from a’,2:’result from b’,3:’result from c’} - and this would be true even if it is a parallel MultiInstance where it was filled out in a different order.

Element Variable This is the variable name for the current iteration of the MultiInstance. In the case of the loopCardinality being just a number, this would be 1,2,3, … If the loopCardinality variable is mapped to a collection it would be either the list value from that position, or it would be the value from the dictionary where the keys are in sorted order. It is the content of the element variable that should be updated in the task.data. This content will then be added to the collection each time the task is completed.

Example:
In a sequential MultiInstance, loop cardinality is [‘a’,’b’,’c’] and elementVariable is ‘myvar’ then in the case of a sequential multiinstance the first call would have ‘myvar’:’a’ in the first run of the task and ‘myvar’:’b’ in the second.
Example:
In a Parallel MultiInstance, Loop cardinality is a variable that contains {‘a’:’A’,’b’:’B’,’c’:’C’} and elementVariable is ‘myvar’ - when the multiinstance is ready, there will be 3 tasks. If we choose the second task, the task.data will contain ‘myvar’:’B’.

Custom Script Engines

You may need to modify the default script engine, whether because you need to make additional functionality available to it, or because you might want to restrict its capabilities for security reasons.

Warning

The default script engine does little to no sanitization and uses eval and exec! If you have security concerns, you should definitely investigate replacing the default with your own implementation.

The default script engine imports the following objects:

  • timedelta
  • datetime
  • dateparser
  • pytz

You could add other functions or classes from the standard python modules or any code you’ve implemented yourself.

In our example models so far, we’ve been using DMN tables to obtain product information. DMN tables have a lot of uses so we wanted to feature them prominently, but in a simple way.

If a customer was selecting a product, we would surely have information about how the product could be customized in a database somewhere. We would not hard code product information in our diagram (although it is much easier to modify the BPMN diagram than to change the code itself!). Our shipping costs would not be static, but would depend on the size of the order and where it was being shipped – maybe we’d query an API provided by our shipper.

SpiffWorkflow is obviously not going to know how to make a call to your database or make API calls to your vendors. However, you can implement the calls yourself and make them available as a method that can be used within a script task.

We are not going to actually include a database or API and write code for connecting to and querying it, but we can model our database with a simple dictionary lookup since we only have 7 products and just return the same static info for shipping for the purposes of the tutorial.

from collections import namedtuple

from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine

ProductInfo = namedtuple('ProductInfo', ['color', 'size', 'style', 'price'])

INVENTORY = {
    'product_a': ProductInfo(False, False, False, 15.00),
    'product_b': ProductInfo(False, False, False, 15.00),
    'product_c': ProductInfo(True, False, False, 25.00),
    'product_d': ProductInfo(True, True, False, 20.00),
    'product_e': ProductInfo(True, True, True, 25.00),
    'product_f': ProductInfo(True, True, True, 30.00),
    'product_g': ProductInfo(False, False, True, 25.00),
}

def lookup_product_info(product_name):
    return INVENTORY[product_name]

def lookup_shipping_cost(shipping_method):
    return 25.00 if shipping_method == 'Overnight' else 5.00

additions = {
    'lookup_product_info': lookup_product_info,
    'lookup_shipping_cost': lookup_shipping_cost
}

CustomScriptEngine = PythonScriptEngine(scriptingAdditions=additions)

We pass the script engine we created to the workflow when we load it.

return BpmnWorkflow(parser.get_spec(process), script_engine=CustomScriptEngine)

We can use the custom functions in script tasks like any normal function:

../_images/custom_script_usage.png

Workflow with lanes

And we can simplify our ‘Call Activity’ flows:

../_images/call_activity_script_flow.png

Workflow with lanes

To run this workflow:

./run.py -p order_product -b bpmn/call_activity_script.bpmn bpmn/top_level_script.bpmn

We have also done some work using Restricted Python to provide more secure alternatives to standard python functions.

Serialization

Warning

Serialization Changed in Version 1.2 The old serialization method still works but it is deprecated. To migrate your system to the new version, see “Migrating between serialization versions” below.

So far, we’ve only considered the context where we will run the workflow from beginning to end in one setting. This may not always be the case, we may be executing the workflow in the context of a web server where we may have a user request a web page where we open a specific workflow that we may be in the middle of, do one step of that workflow and then the user may be back in a few minutes, or maybe a few hours depending on the application.

To accomplish this, we can import the serializer

from SpiffWorkflow.bpmn.serializer import BpmnWorkflowSerializer

This class contains a serializer for a workflow containing only standard BPMN Tasks. Since we are using custom task classes (the Camunda UserTask and the DMN BusinessRuleTask), we’ll need to import serializers for those task s pecs as well.

from SpiffWorkflow.camunda.serializer import UserTaskConverter
from SpiffWorkflow.dmn.serializer import BusinessRuleTaskConverter

Strictly speaking, these are not serializers per se: they actually convert the tasks into dictionaries of JSON-serializable objects. Conversion to JSON is done only as the last step and could easily be replaced with some other output format.

We’ll need to configure a Workflow Spec Converter with our custom classes:

wf_spec_converter = BpmnWorkflowSerializer.configure_workflow_spec_converter(
    [ UserTaskConverter, BusinessRuleTaskConverter ])

We create a serializer that can handle our extended task specs:

serializer = BpmnWorkflowSerializer(wf_spec_converter)

We’ll give the user the option of dumping the workflow at any time.

filename = input('Enter filename: ')
state = serializer.serialize_json(workflow)
with open(filename, 'w') as dump:
    dump.write(state)

We’ll ask them for a filename and use the serializer to dump the state to that file.

To restore the workflow:

if args.restore is not None:
    with open(args.restore) as state:
        wf = serializer.deserialize_json(state.read())

The workflow serializer is designed to be flexible and modular and as such is a little complicated. It has two components:

  • a workflow spec converter (which handles workflow and task specs)
  • a data converter (which handles workflow and task data).

The default workflow spec converter likely to meet your needs, either on its own, or with the inclusion of UserTask and BusinessRuleTask in the camnuda and dmn subpackages of this library, and all you’ll need to do is add them to the list of task converters, as we did above.

However, he default data converter is very simple, adding only JSON-serializable conversions of datetime and timedelta objects (we make these available in our default script engine) and UUIDs. If your workflow or task data contains objects that are not JSON-serializable, you’ll need to extend ours, or extend its base class to create one of your own.

To do extend ours:

  1. Subclass the base data converter
  2. Register classes along with functions for converting them to and from dictionaries
from SpiffWorkflow.bpmn.serializer.dictionary import DictionaryConverter

class MyDataConverter(DictionaryConverter):

    def __init__(self):
        super().__init__()
        self.register(MyClass, self.my_class_to_dict, self.my_class_from_dict)

    def my_class_to_dict(self, obj):
        return obj.__dict__

    def my_class_from_dict(self, dct):
        return MyClass(**dct)

More information can be found in the class documentation for the default converter and its base class for more information.

You can also replace ours entirely with one of your own. If you do so, you’ll need to implement convert and restore methods. The former should return a JSON-serializable representation of your workflow data; the latter should recreate your data from the serialization.

Migrating Between Serialization Versions

Because the serializer is highly customizable, you will need to manage your own versions of the serialization. You can do this by passing a version number into the serializer, which will be embedded in the json of all workflows.

old_serializer = BpmnSerializer() # the deprecated serializer.
# new serializer, which can be customized as described above.
serializer = BpmnWorkflowSerializer(version="MY_APP_V_1.0")

The new serializer has a “get_version” method that will read the version back out of the serialized json. If the version isn’t found, it will return None, and you can then assume it is using the old style serializer.

version = serializer.get_version(some_json)
if(version == "MY_APP_V_1.0"):
     workflow = serializer.deserialize_json(some_json)
else:
     workflow = old_serializer.deserialize_workflow(some_json, workflow_spec=spec)

This should help you migrate your old workflows to the new serialization method. It should also allow you to modify the serialization and customize it over time, and still manage the different forms as you make adjustments without leaving people behind.