WorkFloes
Overview
Floe allows people to write reusable flow-based programs, called WorkFloes. A WorkFloe
is defined by a directed graph where each graph vertex is called a cube. Each
cube may have one or more ports which are used for connecting cubes together. When a WorkFloe
is executed, cubes are executed concurrently, and data is streamed between the cubes.
Technical details such as cube execution order, cube launching, networking, and buffering are handled automatically. The following documentation demonstrates how to get started
writing WorkFloes in Python.
WorkFloes consist of connected cubes, where cubes are typed based on their role in the WorkFloe. Source cubes put data into the WorkFloe, sink cubes store data externally, and compute cubes operate on discrete data on a stream within the WorkFloe. The individual pieces of data on a stream are called work items. Work items may be anything that can be serialized to bytes. See serialization.
Because a WorkFloe
is defined by a graph, rather than code, a WorkFloe
can be expressed using only its graph. The graph
itself can be encoded into structured JSON for portability. WorkFloes can also be written using the Python Floe API, which is discussed in
this document.
Writing a WorkFloe
in Python
To begin, we create a new WorkFloe
object:
from floe.api import WorkFloe
my_floe = WorkFloe("my_floe")
The WorkFloe
constructor takes one required argument, the name of the WorkFloe
. The name should consist of
letters, numbers, and the character “_” (in other words, the name should be a valid Python variable identifier).
You may also provide a human-readable title and description:
my_floe = WorkFloe("my_floe", title="An example WorkFloe", description="...a multiline description may be used here")
You may also provide a brief <= 280 character overview, for display on the Floe
page in the Orion UI:
my_floe = WorkFloe("my_floe", title="An example WorkFloe", brief="...a short overview may be used here", description="...a multiline description may be used here")
If brief
is not provided, then the UI will show some of description
.
Advanced users may specify the amount of memory to be passed to docker’s --shm_size
parameter. This parameter determines the amount of RAM to be made available to
/dev/shm
within the container. Such frameworks as OpenMPI often make use of this shared memory.
This parameter may be set on the General
tab of the Cube Parameters
sheet when launching jobs in the Orion UI, or defaulted as a parameter_overrides
option
when defining cubes programmatically:
parameter_overrides = {
"shared_memory_mb": {"default": 1024},
}
This parameter is defaulted to 64 MiB, docker’s default.
Warning
If you oversize /dev/shm
the machine will deadlock since the OOM handler will not be able to free that memory.
See https://docs.kernel.org/filesystems/tmpfs.html for more information.
Advanced users may utilize the pids_per_cpu_limit
cube parameter in Orion to specify the maximum number of pids (processes/threads) per CPU within a cube, defaulting to 32.
In order to determine the max number of pids available to the user the pids_per_cpu_limit
is multiplied by the number of allocated CPUs.
The number of allocated CPUs will be at least equal to the value specified by the cpu_count
parameter.
Additionally, Orion then adds three to the specified value to account for built-in processes, regardless of the number of CPUs.
Therefore, the total pids-limit is equal to (pids_per_cpu_limit
* number of allocated CPUs) + 3.
Advanced users may utilize the max_backlog_wait
cube parameter in Orion to specify the maximum number of seconds a cube will be backlogged on a group before being re-evaluated.
This parameter is defaulted to 600 seconds.
Note
The total pids-limit value for a cube is stored in the ORION_NUM_PROCS
environment variable.
The pids_per_cpu_limit
may be set on the System
tab of the Cube Parameters
sheet when launching jobs in the Orion UI, or defaulted as a parameter_overrides
option
when defining cubes programmatically. The following parameter_overrides
examples will both allot the cube 48 usable threads/processes:
parameter_overrides = {
"pids_per_cpu_limit": {"default": 48},
}
or
parameter_overrides = {
"pids_per_cpu_limit": {"default": 24},
"cpu_count": {"default": 2},
}
Warning
There is no upper limit for the size of pids_per_cpu_limit
, however large values may compromise performance or even cause failures due to excessive fork calls
Warning
You should increase pids_per_cpu_limit
if you see Resource temporarily unavailable
errors when launching processes
You may also add one or more classifications, to make it easier to find on the Floe
page in the Orion UI. Snowball includes many predefined classifications or you may create your own custom classification:
from snowball import floeclass
my_floe.classification.append(floeclass.task_cheminfo)
from pathlib import PurePath
my_floe.classification.append(PurePath('Company X/Floe category N/Floe subcategory M'))
Each label (that is, “Floe category N”) in the path must be <= 85 characters, but there is no limit to the depth of the path. We recommend not going deeper than 3–5. We also recommend not having too many categories/subcategories in view at any given level.
For more information about ready-made classification values you can import and use, see the relevant section in the Snowball documentation.
Adding cubes to a WorkFloe
A WorkFloe
requires cubes to actually do something. Cubes can be added to the WorkFloe
as shown in this example:
from floe.api import SinkCube, SourceCube
ifs = SourceCube()
ofs = SinkCube()
my_floe.add_cubes(ifs, ofs)
Connecting cubes together
Connections are made between cubes using the connect()
method of each cube’s port. Connections must always be made from
the source cube to the destination cube.
Building on the previous example, at this point, my_floe
consists of two cubes; ifs
and ofs
. However, there are no connections yet.
Connections can be made using the source and destination ports:
ifs.success.connect(ofs.intake)
Note
Cubes must always be added to an instance of a WorkFloe
before connections can be made.
Now my_floe
defines a valid WorkFloe.
Running a WorkFloe
The Floe Python API allows WorkFloes to be executed within the Orion platform, and on developer’s own machines. A WorkFloe
can be executed from Python code, or JSON. For development, it is convenient to run a WorkFloe
from Python as shown in the following example.
Example:
# example_floe.py
from my_cubes import MySourceCube, MySinkCube, OE2DPSACube
from floe.api import WorkFloe
job = WorkFloe("Calculate 2D PSA Floe")
ifs = MySourceCube()
psa = OE2DPSACube(title="PSA Calculator")
ofs = MySinkCube()
job.add_cubes(ifs, psa, ofs)
ifs.success.connect(psa.intake)
psa.success.connect(ofs.intake)
if __name__ == "__main__":
job.run()
Now example.py
can be run as a Python script:
$ floe run example_floe.py
or:
$ python example_floe.py
For local development, invoking WorkFloe floe run example_floe.py --help
will print parameters
grouped and ordered as in Orion UI.
Note
You must always include the if __name__ == "__main__"
statement as a condition to call run()
,
or Floe will not be able to import your Floe without running it.
A new local runtime has been added and can be swapped out with the existing local runtime without
changes to internal WorkFloe
code. To use the new runtime, set the environment variable FLOERUNTIME=v2
:
$ FLOERUNTIME=v2 floe run example_floe.py
or:
$ FLOERUNTIME=v2 python example_floe.py
The V2 runtime’s output has new fields for cube types, groups, cycles, and wait types as well as an experimental
uneager cube start option. The uneager option makes the V2 runtime delay starting cubes until they have input.
Cubes with no input ports do not have a delayed start. Uneager cube start can be used by setting the
environment variable EAGER=false
:
$ FLOERUNTIME=v2 EAGER=false floe run example_floe.py
or:
$ FLOERUNTIME=v2 EAGER=false python example_floe.py
Output Columns
When running a WorkFloe
locally, information about each cube will be printed periodically.
Example:
FFFFFF F FFFFFF FFFFFF FK FKF KF FK
K FF FF KF K oF oF
K FF K K KFFFFF FF FK .K
KFFFFF FF K K K KF KF
K FF FKF FK K oF
F FFFFFF FFFFF FFFFFF FF
OpenEye Floe Runtime
Python: CPython 3.9.7 Platform: macOS-10.15.7-x86_64-i386-64bit
OpenEye-OrionPlatform: 4.4.0 CPUs: 12
11:39:17 Running: Simple Floe
11:39:17 ==========================================================================================
11:39:17 Cube CPUs CPU Time State In Processed Out Bytes
11:39:17 ------------------------------------------------------------------------------------------
11:39:17 FileToRecord 1 - SCHEDULED - - - -
11:39:17 DatasetWrite 1 - SCHEDULED - - - -
11:39:18 ==========================================================================================
11:39:18 Cube CPUs CPU Time State In Processed Out Bytes
11:39:18 ------------------------------------------------------------------------------------------
11:39:18 FileToRecord 0 00:0.000s FINISHED - - 200 51.2 KB
11:39:18 DatasetWrite 0 00:0.007s FINISHED 200 200 - -
11:39:18 Simple Floe completed in 00:0.829s
Or with FLOERUNTIME=v2
:
FFFFFF F FFFFFF FFFFFF FK FKF KF FK
K FF FF KF K oF oF
K FF K K KFFFFF FF FK .K
KFFFFF FF K K K KF KF
K FF FKF FK K oF
F FFFFFF FFFFF FFFFFF FF
OpenEye Floe Runtime
Python: 3.9.7 Platform: macOS-10.15.7-x86_64-i386-64bit
Runtime: 1.2.0 OpenEye-OrionPlatform: 4.4.0
CPU Slots: 13 Max Parallel: 11
Eager Start: true
11:39:32 Running: 'Simple Floe'
11:39:32 ==================================================================================================================
11:39:32 Cube CPUs Time State In Processed Out Bytes Group Cycle Wait Type
11:39:32 ------------------------------------------------------------------------------------------------------------------
11:39:32 FileToRecord 1 0s SCHEDULED - - - - - - - SRC
11:39:32 DatasetWrite 0 0s SCHEDULED - - - - - - - SINK
11:39:33 ==================================================================================================================
11:39:33 Cube CPUs Time State In Processed Out Bytes Group Cycle Wait Type
11:39:33 ------------------------------------------------------------------------------------------------------------------
11:39:33 FileToRecord 0 0s FINISHED - - 200 51.2K - - - SRC
11:39:33 DatasetWrite 0 0s FINISHED 200 200 - - - - - SINK
11:39:33 Simple Floe completed in 1.5s
Cube
The name of the cube.
CPUs
The number of CPUs the cube is utilizing.
CPU Time
The cumulative processing time across all CPUs for the cube.
State
The state of the cube. One of PENDING
, SCHEDULED
, INIT
, RUNNING
,
THROTTLING
, FLUSHING
, STOPPING
, or FINISHED
.
In
The count of items read by the cube.
Processed
The count of items processed by the cube.
Out
The count of items output by the cube.
Bytes
The number of bytes of data output by the cube.
Group
Only available in new version of the floe runtime
The name of the group, if the cube is in a group.
Cycle
Only available in new version of the floe runtime
The name of the cycle’s head cube, if the cube is in a cycle.
Wait
Only available in new version of the floe runtime
The current wait state of the cube. One of:
-
: Not waiting
CubeStart
: Waiting on the cube to import and initialize
CubeWrite
: Waiting to send data to the downstream cube
CubeRead
: Waiting to read data from the upstream cube
Input
: Waiting for input to the cube
BufferWrite
: Waiting to emit data from the cube to a shared buffer
Parallel
: Waiting for the cube to finish parallel work
Initializer
: Waiting for the initializer data for the cube
Type
Only available in new version of the floe runtime
The type of the cube. One of:
SRC
: Source cube
SINK
: Sink cube
COMP
: Compute cube
PRL
: Parallel cube
PGRP
: Parallel group
Serializing a WorkFloe
A WorkFloe
can be serialized to JSON using the Floe Python API.
An example WorkFloe:
from floe.api import SinkCube, WorkFloe, SourceCube
my_floe = WorkFloe("my_floe")
ifs = SourceCube()
ofs = SinkCube()
my_floe.add_cubes(ifs, ofs)
Serializing to JSON
The example above can be converted into a Python dictionary suitable for serialization:
serialized = my_floe.json()
The entire WorkFloe
can be reconstructed from JSON back into a WorkFloe
object:
my_floe = WorkFloe.from_json(serialized)
Note
All files containing cubes must be in your PYTHONPATH, otherwise they cannot be imported.
Cube Execution Order
The order of execution of cubes in a WorkFloe
is determined automatically by examining the graph that defines it. Using the Floe
Python API to run WorkFloes (outside of Orion) will, by default, use all available CPUs. One CPU is reserved for coordination, as well as one additional CPU
per parallel cube for work tracking, and the remaining CPUs are used to run cubes.
Cube Groups
An implementation of a FBP (Flow Based Programming) system can be designed to operate on streams of data in two ways. The first way is process centric, where communicating sequential processes operate on discrete items as they arrive on a stream and forward the mutated item downstream. The second way is data centric, where a sequence of computations are performed by a single process on each item in the stream, one at a time. For example, suppose two computations are to be performed, A followed by B. In a process centric implementation, A and B would be executed concurrently by two separate processes. A would perform its computation on each item in the stream and then transmit the item to B (possibly over a network connection). Note that this type of system requires that items be serialized to a byte representation by A, then deserialized by B. In a data centric implementation, a single process would execute A, then execute B on each item in the stream, without the need for serialization. In summary, a process centric implementation has the benefit of concurrency, but a data centric implementation doesn’t incur the cost of serialization. Floe allows for both modes of execution at the same time.
By default, each cube is executed in its own process, and items are transmitted between cubes using a network connection.
However, the time required for serialization between cubes can often be longer than the cubes themselves. In such cases,
efficiency can be increased by grouping sequential cubes together to be executed by a single process, without serialization,
in a data centric way. Groups can be used in a WorkFloe
just like any other cube, and can be connected to other cubes or groups.
Cubes can be added to a CubeGroup
by passing a list of cubes into the constructor, and then adding the group
to the WorkFloe:
# example_group_floe.py
from my_cubes import PrinterCube
from floe.api import WorkFloe, SourceCube, SinkCube, CubeGroup
job = WorkFloe("Cube Group Example")
ifs = SourceCube()
printer = PrinterCube()
ofs = SinkCube()
# Add cubes to the WorkFloe
job.add_cubes(ifs, printer, ofs)
# Create a cube group
group = CubeGroup(cubes=[ifs, printer, ofs])
# Add the group to the WorkFloe
job.add_group(group)
ifs.success.connect(printer.intake)
printer.success.connect(ofs.intake)
if __name__ == "__main__":
job.run()
Parallel Cube Groups
Parallel cubes can be grouped together just like regular cubes. A parallel cube group behaves just like a parallel cube, but can run multiple parallel cubes at once.
Parallel cubes may be added to parallel cube groups by passing the list of cubes to an constructor and adding the group to the WorkFloe:
# example_parallel_group_floe.py
from my_cubes import ParallelPassThrough, ParallelPrinterCube
from floe.api import WorkFloe, SourceCube, SinkCube, ParallelCubeGroup
job = WorkFloe("parallel_group_example")
ifs = SourceCube()
passthrough = ParallelPassThrough()
printer = ParallelPrinterCube()
ofs = SinkCube()
# Create a cube group
group = ParallelCubeGroup(cubes=[passthrough, printer])
# Add the group to the WorkFloe
job.add_group(group)
# Add cubes to the WorkFloe
job.add_cubes(ifs, passthrough, printer, ofs)
ifs.success.connect(passthrough.intake)
passthrough.success.connect(printer.intake)
printer.success.connect(ofs.intake)
if __name__ == "__main__":
job.run()
Nested WorkFloes
It is often useful to encapsulate a set of connected cubes (a partial or complete WorkFloe) as a cube itself. This type of cube is known as a HyperCube.
Defining HyperCubes
HyperCubes are a wrapper around a partial WorkFloe
that can be treated as a cube within another WorkFloe. HyperCubes are useful to create a collection of cubes
that are repeatedly used in a particular configuration. Note that HyperCubes are distinct from CubeGroups. Whereas CubeGroups execute a set of cubes in a single process,
a HyperCube does not. A WorkFloe
consisting of HyperCubes will execute identically to a WorkFloe
consisting of all of the cubes within those HyperCubes (assuming they are connected identically).
The following example demonstrates how to define a partial WorkFloe
and promote ports to act as the HyperCubes ports.
from floe.api import WorkFloe, HyperCube
from floe.tests.test_cubes import PrintPassThroughCube
# Create WorkFloe
job = WorkFloe(name="Example HyperCube")
print_cube_1 = PrintPassThroughCube()
print_cube_2 = PrintPassThroughCube()
print_cube_3 = PrintPassThroughCube()
job.add_cubes(print_cube_1, print_cube_2, print_cube_3)
# Connect Cubes within WorkFloe
print_cube_1.success.connect(print_cube_2.intake)
print_cube_2.success.connect(print_cube_3.intake)
# Wrap ``WorkFloe`` in a HyperCube
HyperPrintCube = HyperCube(job)
# Promote Ports to act as the inputs and outputs of the HyperCube
HyperPrintCube.promote_port(print_cube_1, "intake")
# Able to add a promoted name which will be used in place of the original name
HyperPrintCube.promote_port(print_cube_3, "success", promoted_name="pass_success")
The following example demonstrates how to use a HyperCube in a WorkFloe
from floe.api import WorkFloe, SourceCube, SinkCube
# Import the variable that is defined as the Workfloe wrapped in a HyperCube
from example.hyper_pass import HyperPrintCube
# Create WorkFloe
job = WorkFloe(name="Test HyperCube")
ifs = SourceCube()
pass_through_cube = HyperPrintCube()
ofs = SinkCube()
job.add_cubes(ifs, pass_through_cube, ofs)
ifs.success.connect(pass_through_cube.intake)
# Use the promoted name of the success port
pass_through_cube.pass_success.connect(ofs.intake)
if __name__ == "__main__":
job.run()
Note
HyperCubes are evaluated at run time, so you must declare a variable to represent your HyperCube.
In the above example this is done with HyperPrintCube
and should be imported as such
Using HyperCubes
HyperCubes can be treated as almost identical to cubes. You define them with a name
and an optional keyword
argument title
. Once instantiated, HyperCubes are added to a WorkFloe
in the same way that Cubes are.
from floe.api import WorkFloe, SourceCube, SinkCube
from floe.tests.hyper_cubes.hyper_print_cube import HyperPrintCube
job = WorkFloe("Simple HyperCube Floe")
ifs = SourceCube()
ifs.modify_parameter(ifs.data_in, promoted_name="ifs")
# HyperCubes take similar parameters, notably a name and the optional title keyword argument
hyper_print = HyperPrintCube()
ofs = SinkCube()
ofs.modify_parameter(ofs.data_out, promoted_name="ofs")
job.add_cubes(ifs, hyper_print, ofs)
ifs.success.connect(hyper_print.intake)
hyper_print.success.connect(ofs.intake)
if __name__ == "__main__":
job.run()
Note
Support for using HyperCubes in the Orion UI is a work in progress, and not yet supported.
Multistage WorkFloes
The execution of a WorkFloe
can be separated into stages, where each stage runs to completion before the next begins. This is useful in cases
where data needs to be carried forward as a single unit instead of being streamed. Propagation of data is accomplished by connecting output
parameters of one stage to input parameters of a subsequent stage. A stage may be any valid WorkFloe
that does not have multiple stages itself.
Note
Support for running multistage WorkFloes in Orion is a work in progress, and not yet supported.
The following example demonstrates how to define and connect stages in a WorkFloe.
import os
import sys
import tempfile
from floe.api import WorkFloe, SourceCube, SinkCube
from floe.tests.test_cubes import TTLSetter, TTLPrinter
# Define the multistage workfloe
job = WorkFloe("multistage workfloe")
# Stage 1
stage1 = WorkFloe("stage1")
ifs = SourceCube()
ifs.modify_parameter(ifs.data_in, promoted_name="ifs")
ttl = TTLSetter()
ofs = SinkCube()
ofs.modify_parameter(ofs.data_out, promoted_name="ofs")
stage1.add_cubes(ifs, ttl, ofs)
ifs.success.connect(ttl.intake)
ttl.success.connect(ofs.intake)
# Stage 2
stage2 = WorkFloe("stage2")
ifs2 = SourceCube()
ofs2 = SinkCube()
ofs2.modify_parameter(ofs2.data_out, promoted_name="ofs2")
ifs3 = SourceCube()
ttl_printer = TTLPrinter()
stage2.add_cubes(ifs2, ofs2, ttl_printer, ifs3)
ifs2.success.connect(ofs2.intake)
ifs3.success.connect(ttl_printer.intake)
# Add stages
job.add_stage(stage1)
job.add_stage(stage2)
# Connect parameters across stages
ofs.connect_parameter("ofs", ifs2, "data_in")
ofs.connect_parameter("ofs", ifs3, "data_in")
if __name__ == "__main__":
job.run()
Cyclic WorkFloes
Many problems have solutions which are cyclic by nature, such as optimization. Floe comes with support for running WorkFloes containing cycles, with some caveats. Cycles should be used judiciously, as they can easily lead to deadlock or livelock. Cyclic CubeGroups are supported locally and in orion with the only restriction being that a CubeGroup may only be part of one cycle at a time.
When designing cycles in Floe, keep in mind:
Although a
WorkFloe
may be defined by an arbitrary number of cubes, the number of cubes in a single cycle is limited to the number of CPUs available for scheduling, as all cubes in a cycle must run concurrently.There should always be at least one tested data path out of the cycle.
Floe does not provide a mechanism for a cycle invariant. If a you construct an infinite loop, it will run forever. See the previous bullet.
Many different methods exist for how to exit a cycle, usage will depend on the scenario. If the cycle should be exited from a parallel cube, make sure to store the determining information for exit on each piece of data rather than the cube itself because parallel cube attributes are not synced among each instance.
The following example demonstrates one method to exit a cycle using a Time To Live (TTL) field stored on each piece of data.
With cube definitions:
from floe.api import SourceCube, ComputeCube, ParallelMixin, JsonInputPort, JsonOutputPort
from floe.api.parameters import IntegerParameter
class GenerateJSON(SourceCube):
output = JsonOutputPort("output")
def __iter__(self):
for i in range(10):
yield {"data": "Dummy data", "id": i}
class JSONBase(ComputeCube):
intake = JsonInputPort("intake")
output = JsonOutputPort("output")
class JSONPrintPassThrough(JSONBase):
def process(self, data, port):
print(self, data)
self.output.emit(data)
class JSONTTLSetter(JSONBase):
ttl = IntegerParameter(
"ttl", default=2, help_text="The initial ttl value", title="Time to live"
)
def process(self, data, port):
data["ttl"] = self.args.ttl
self.emit(data)
class ParallelJSONTTLRouter(ParallelMixin, JSONBase):
ttl_threshold = IntegerParameter(
"ttl_threshold",
default=0,
help_text="Threshold for breaking a cycle",
title="Time to live Threshold",
)
cycle = JsonOutputPort("cycle")
def process(self, data, port):
ttl = data.get("ttl", self.args.ttl_threshold)
if ttl <= self.args.ttl_threshold:
self.output.emit(data)
else:
data["ttl"] = ttl - 1
self.cycle.emit(data)
Each piece of data will cycle three times before exiting:
from floe.api import WorkFloe
from sample_cubes import GenerateJSON, JSONTTLSetter, JSONPrintPassThrough, ParallelJSONTTLRouter
# c denotes cyclic output, e denotes ending output, P denotes parallel cube
# Number below each cube marks how many times each piece of data will visit
# ┌──┐ ┌──┐ ┌──┐ ┌──┐ ┌──┐
# │ A├─►│ B├───►│ C├─►│DP├─┬─e─►│ F│
# │1x│ │1x│ ▲ │4x│ │4x│ │ │1x│
# └──┘ └──┘ │ └──┘ └──┘ │ └──┘
# │ c
# │ │
# │ ┌──┐ │
# └─────┤ E│◄───┘
# │3x│
# └──┘
floe = WorkFloe("Cyclic Floe")
cubeA = GenerateJSON("Cube A")
cubeB = JSONTTLSetter("Cube B")
cubeC = JSONPrintPassThrough("Cube C")
cubeD = ParallelJSONTTLRouter("Cube D")
cubeE = JSONPrintPassThrough("Cube E")
cubeF = JSONPrintPassThrough("Cube F")
# Add cubes and modify parameters
floe.add_cubes(cubeA, cubeB, cubeC, cubeD, cubeE, cubeF)
# Modifying the ttl parameter determines how many iterations of the cycle will occur
cubeB.modify_parameter(cubeB.ttl, default=3)
# Connect cubes
cubeA.output.connect(cubeB.intake)
cubeB.output.connect(cubeC.intake)
cubeC.output.connect(cubeD.intake)
cubeD.cycle.connect(cubeE.intake)
cubeE.output.connect(cubeC.intake)
cubeD.output.connect(cubeF.intake)
if __name__ == "__main__":
floe.run()