Introduction
Given the long-running nature of many workflows, robust serialization capabilities are critical to any kind of workflow execution library. We face several problems in serializing workflows:
workflows may contain arbitrary data whose serialization mechanisms cannot be built into the library itself
workflows may contain custom tasks and these also cannot be built into the library
workflows may contain hundreds of tasks, generating very large serializations
objects contained in the workflow data might also be very large
the serialized data needs to be stored somewhere and there is no one-size-fits-all way of doing this
In the first section of this document, we’ll show how to handle the first problem.
Implementing a Custom Task Spec contains an example of handling the second problem.
In the second section of this document, we’ll discuss some of the ways the remaining problems might be alleviated though creative use of the serializer’s capabilities.
Serializing Custom Objects
In Script Engine Overview, we add some custom methods and objects to our scripting environment. We create a simple
class (a namedtuple
) that holds the product information for each product.
We’d like to be able to save and restore our custom object. This code lives in spiff/product_info.py.
ProductInfo = namedtuple('ProductInfo', ['color', 'size', 'style', 'price'])
def product_info_to_dict(obj):
return {
'color': obj.color,
'size': obj.size,
'style': obj.style,
'price': obj.price,
}
def product_info_from_dict(dct):
return ProductInfo(**dct)
And in spiff/custom_object.py:
from SpiffWorkflow.spiff.serializer.config import SPIFF_CONFIG
from ..serializer.file import FileSerializer
registry = FileSerializer.configure(SPIFF_CONFIG)
registry.register(ProductInfo, product_info_to_dict, product_info_from_dict)
serializer = FileSerializer(dirname, registry=registry)
We don’t have any custom task specs in this example, so we can use the default serializer configuration for the module we’re using. We’ll use the spiff/serializer/file/serializer.py serializer. This is a very simple serializer – it converts the entire workflow to the default JSON format and writes it to disk in a readable way.
Note
The default BpmnWorkflowSerializer
has a serialize_json method that essentially does the same thing,
except without formatting the JSON. We bypass this so we can intercept the JSON-serializable representation
and write it ourselves to a location of our choosing.
We initialize a registry
using the serializer; this registry contains the conversions for the objects
used workflow-internally.
Now we can add our custom serialization methods to this registry using the registry.register
method. The
arguments here are:
the class that requires serialization
a method that creates a dictionary representation of the object
a method that recreates the object from that representation
Registering an object sets up relationships between the class and the serialization and deserialization methods.
The register
method assigns a typename
for the class, and generates partial functions that call the
appropriate methods based on the typename
, and stores these conversion mechanisms.
Note
The supplied to_dict
and from_dict
methods must always return and accept dictionaries, even if
they might have been serialized some other way.
If you’re interested in how this works, the heart of the registry is the DictionaryConverter.
The price is a slightly less customizable serialized format; the benefit is that these partial functions can
replace humongous if/else
blocks that test for specific classes and attributes.
Optimizing Serializations
File Serializer
Now we’ll turn to the customizations we made in the serializer/file/serializer.py.
We’ve extended the BpmnWorkflowSerializer
to take a directory where we’ll write our files, and additionally
we’ll impose some structure inside this dictionary. We’ll separate serialized workflow specs from instance data, and
set an output format that we can actually read.
Our engine requires a certain API from our serializer, and that’s what the remainder of the methods are. We won’t go into these method here, as they don’t actually have much to do with the library. We made few (the spiff/custom_object.py) or no modifications (the spiff/file.py) so there isn’t much to discuss.
We call self.to_dict and self.from_dict, which handle all conversions based on how we’ve set up the
registry
.
Note
We haven’t referenced any particular code, as almost all the code here is about managing our directory structure and formatting the JSON output appropriately.
The file serializer is actually not particularly optimized, but it is simple to understand, while also providing the evidence that you probably want to do more. The output here is essentially the what you get by default. This useful to be able to easily see in and of itself, and if you examine it, you’ll see there would be a lot of opportunity for splitting the output into its components and handling them separately.
SQLite Serializer
We have a second example serializer that stores serializations in a SQLite database in serializer/sqlite/serializer.py. This might be a slightly more realistic use case of what you want to do, so we’ll discuss this in a little more detail (but it is also a considerably more complex example).
Our database schema actually takes care of much of the work, but since this isn’t an SQL tutorial, I’ll just refer you to the file that contains it: serializer/sqlite/schema.sql. Of course, you do not have to interact with the database directly (or even use a database at all) and some of all of the triggers and views and so forth could be replaced with Python code (or simplified quite a bit if using a more robust DB).
This is intended to be a somewhat extreme example in order to make it clear that you really aren’t bound to retrieving and storing a gigantic blob, and the logic for dealing with it does not have to be interspersed with the rest of your code.
In addition to our triggers, we also rely pretty heavily on SQLite adapters. Between these two things, we hardly have to worry about the types of objects we get back at all!
From our execute
method:
conn = sqlite3.connect(self.dbname, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
conn.execute("pragma foreign_keys=on")
sqlite3.register_adapter(UUID, lambda v: str(v))
sqlite3.register_converter("uuid", lambda s: UUID(s.decode('utf-8')))
sqlite3.register_adapter(dict, lambda v: json.dumps(v))
sqlite3.register_converter("json", lambda s: json.loads(s))
We use UUID
for spec and instance IDs and store all our workflow data as JSON. Our serializer guarantees
that its output will be JSON-serializable, so when we store it, we can just drop its output right into the DB, and
feed the DB output back into the serializer.
To help this process along, we’ve customized a few of the default conversions for our specs.
class WorkflowConverter(BpmnWorkflowConverter):
def to_dict(self, workflow):
dct = super(BpmnWorkflowConverter, self).to_dict(workflow)
dct['bpmn_events'] = self.registry.convert(workflow.bpmn_events)
dct['subprocesses'] = {}
dct['tasks'] = list(dct['tasks'].values())
return dct
class SubworkflowConverter(BpmnSubWorkflowConverter):
def to_dict(self, workflow):
dct = super().to_dict(workflow)
dct['tasks'] = list(dct['tasks'].values())
return dct
class WorkflowSpecConverter(BpmnProcessSpecConverter):
def to_dict(self, spec):
dct = super().to_dict(spec)
dct['task_specs'] = list(dct['task_specs'].values())
return dct
We aren’t making extensive customizations here, mainly just switching some dictionaries to lists; this is because we
store these items in separate tables, so it’s convenient to get an output that can be passed directly to an
insert
statement.
When we configure our engine, we update the serializer configuration to use these classes (this code is from spiff/sqlite.py:
from SpiffWorkflow.spiff.serializer import DEFAULT_CONFIG
from ..serializer.sqlite import (
SqliteSerializer,
WorkflowConverter,
SubworkflowConverter,
WorkflowSpecConverter
)
DEFAULT_CONFIG[BpmnWorkflow] = WorkflowConverter
DEFAULT_CONFIG[BpmnSubWorkflow] = SubworkflowConverter
DEFAULT_CONFIG[BpmnProcessSpec] = WorkflowSpecConverter
dbname = 'spiff.db'
with sqlite3.connect(dbname) as db:
SqliteSerializer.initialize(db)
registry = SqliteSerializer.configure(DEFAULT_CONFIG)
serializer = SqliteSerializer(dbname, registry=registry)
Finally, let’s look at two of the methods we’ve implemented for the API required by our engine:
def _create_workflow(self, cursor, workflow, spec_id):
dct = super().to_dict(workflow)
wf_id = uuid4()
stmt = "insert into workflow (id, workflow_spec_id, serialization) values (?, ?, ?)"
cursor.execute(stmt, (wf_id, spec_id, dct))
if len(workflow.subprocesses) > 0:
cursor.execute("select serialization->>'name', descendant from spec_dependency where root=?", (spec_id, ))
dependencies = dict((name, id) for name, id in cursor)
for sp_id, sp in workflow.subprocesses.items():
cursor.execute(stmt, (sp_id, dependencies[sp.spec.name], self.to_dict(sp)))
return wf_id
def _get_workflow(self, cursor, wf_id, include_dependencies):
cursor.execute("select workflow_spec_id, serialization as 'serialization [json]' from workflow where id=?", (wf_id, ))
row = cursor.fetchone()
spec_id, workflow = row[0], self.from_dict(row[1])
if include_dependencies:
workflow.subprocess_specs = self._get_subprocess_specs(cursor, spec_id)
cursor.execute(
"select descendant as 'id [uuid]', serialization as 'serialization [json]' from workflow_dependency where root=? order by depth",
(wf_id, )
)
for sp_id, sp in cursor:
task = workflow.get_task_from_id(sp_id)
workflow.subprocesses[sp_id] = self.from_dict(sp, task=task, top_workflow=workflow)
return workflow
We store subprocesses in the same table as top level processes because they are essentially the same thing.
We maintain a table that stores the parent/child relationships in a separate spec dependency table. While we don’t do
this currently, we could modify our queries to ignore subprocesses that have been completed when we retrieve a workflow:
they could potentially contain many tasks that will never be revisited. Or, conversely, we could limit what we restore
to subprocesses that had READY
tasks to avoid loading something that is waiting for a timer that will fire in
two weeks.
We did not show the code for serializing workflow specs, but it is similar – all specs, whether top-level or for subprocesses and call activities live in one table, with a second that keeps track of dependencies between them. This would make it possible to wait to load a spec until the task it was associated with needed to be executed.
We also maintain task data separately from information about workflow state; so while we’re not doing this now, it provides
the potential to selectively retrieve it – for example, it could be omitted from COMPLETED
tasks.
What I aim to get across here is that there are quite a few possiblities for customizing how your application serializes its workflows – you’re not limited to a giant JSON blob that you get by default.
Serialization Versions
As we make changes to Spiff, we may change the serialization format. For example, in 1.2.1, we changed how subprocesses were handled interally in BPMN workflows and updated how they are serialized and we upraded the serializer version to 1.1.
Since workflows can contain arbitrary data, and even SpiffWorkflow’s internal classes are designed to be customized in ways that might require special serialization and deserialization, it is possible to override the default version number, to provide users with a way of tracking their own changes.
If you have not provided a custom version number, SpiffWorkflow will attempt to migrate your workflows from one version to the next if they were serialized in an earlier format.
If you’ve overridden the serializer version, you may need to incorporate our serialization changes with your own. You can find our migrations in SpiffWorkflow/bpmn/serilaizer/migrations
These are broken up into functions that handle each individual change, which will hopefully make it easier to incoporate them into your upgrade process, and also provides some documentation on what has changed.