Floe allows people to write reusable flow-based programs, called WorkFloes. A
WorkFloe is defined by a directed graph where each graph vertex is called a cube. Each
cube may have one or more ports which are used for connecting cubes together. When a
WorkFloe is executed, cubes are executed concurrently, and data is streamed between the cubes.
Technical details such as cube execution order, cube launching, networking, and buffering are handled automatically. The following documentation demonstrates how to get started
writing WorkFloes in Python.
WorkFloes consist of connected cubes, where cubes are typed based on their role in the WorkFloe. Source cubes put data into the WorkFloe, sink cubes store data externally, and compute cubes operate on discrete data on a stream within the WorkFloe. The individual pieces of data on a stream are called work items. Work items may be anything that can be serialized to bytes. See serialization.
WorkFloe is defined by a graph, rather than code, a
WorkFloe can be expressed using only its graph. The graph
itself can be encoded into structured JSON for portability. WorkFloes can also be written using the Python Floe API, which is discussed in
WorkFloe in Python¶
To begin, we create a new
from floe.api import WorkFloe my_floe = WorkFloe("my_floe")
WorkFloe constructor takes one required argument, the name of the
WorkFloe. The name should consist of
letters, numbers, and the character “_” (in other words, the name should be a valid Python variable identifier).
You may also provide a human-readable title and description:
my_floe = WorkFloe("my_floe", title="An example WorkFloe", description="...a multiline description may be used here")
You may also provide a brief <= 280 character overview, for display on the
Floe page in the Orion UI:
my_floe = WorkFloe("my_floe", title="An example WorkFloe", brief="...a short overview may be used here", description="...a multiline description may be used here")
brief is not provided, then the UI will show some of
You may also add one or more classifications, to make it easier to find on the
Floe page in the Orion UI. Snowball includes many predefined classifications or you may create your own custom classification:
from snowball import floeclass my_floe.classification.append(floeclass.task_cheminfo) from pathlib import PurePath my_floe.classification.append(PurePath('Company X/Floe category N/Floe subcategory M'))
Each label (that is, “Floe category N”) in the path must be <= 85 characters, but there is no limit to the depth of the path. We recommend not going deeper than 3–5. We also recommend not having too many categories/subcategories in view at any given level.
For more information about ready-made classification values you can import and use, see the relevant section in the Snowball documentation.
Adding cubes to a WorkFloe¶
WorkFloe requires cubes to actually do something. Cubes can be added to the
WorkFloe as shown in this example:
from floe.api import SinkCube, SourceCube ifs = SourceCube() ofs = SinkCube() my_floe.add_cubes(ifs, ofs)
Connecting cubes together¶
Connections are made between cubes using the
connect() method of each cube’s port. Connections must always be made from
the source cube to the destination cube.
Building on the previous example, at this point,
my_floe consists of two cubes;
ofs. However, there are no connections yet.
Connections can be made using the source and destination ports:
Cubes must always be added to an instance of a
WorkFloe before connections can be made.
my_floe defines a valid WorkFloe.
Running a WorkFloe¶
The Floe Python API allows WorkFloes to be executed within the Orion platform, and on developer’s own machines. A WorkFloe
can be executed from Python code, or JSON. For development, it is convenient to run a
WorkFloe from Python as shown in the following example.
# example_floe.py from my_cubes import OE2DPSACube from floe.api import SinkCube, WorkFloe, SourceCube job = WorkFloe("Calcluate 2D PSA Floe") ifs = SourceCube() psa = OE2DPSACube(title="PSA Calculator") ofs = SinkCube() job.add_cubes(ifs, psa, ofs) ifs.success.connect(psa.intake) psa.success.connect(ofs.intake) if __name__ == "__main__": job.run()
example.py can be run as a Python script:
$ floe run example_floe.py
$ python example_floe.py
You must always include the
if __name__ == "__main__" statement as a condition to call
or Floe will not be able to import your Floe without running it.
A new local runtime has been added and can be swapped out with the existing local runtime without
changes to internal
WorkFloe code. To use the new runtime, set the environment variable
$ FLOERUNTIME=v2 floe run example_floe.py
$ FLOERUNTIME=v2 python example_floe.py
The V2 runtime’s output has new fields for cube types, groups, cycles, and wait types as well as an experimental
uneager cube start option. The uneager option makes the V2 runtime delay starting cubes until they have input.
Cubes with no input ports do not have a delayed start. Uneager cube start can be used by setting the
$ FLOERUNTIME=v2 EAGER=false floe run example_floe.py
$ FLOERUNTIME=v2 EAGER=false python example_floe.py
When running a
WorkFloe locally, information about each cube will be printed periodically.
FFFFFF F FFFFFF FFFFFF FK FKF KF FK K FF FF KF K oF oF K FF K K KFFFFF FF FK .K KFFFFF FF K K K KF KF K FF FKF FK K oF F FFFFFF FFFFF FFFFFF FF OpenEye Floe Runtime Python: CPython 3.9.7 Platform: macOS-10.15.7-x86_64-i386-64bit OpenEye-OrionPlatform: 4.4.0 CPUs: 12 11:39:17 Running: Simple Floe 11:39:17 ========================================================================================== 11:39:17 Cube CPUs CPU Time State In Processed Out Bytes 11:39:17 ------------------------------------------------------------------------------------------ 11:39:17 FileToRecord 1 - SCHEDULED - - - - 11:39:17 DatasetWrite 1 - SCHEDULED - - - - 11:39:18 ========================================================================================== 11:39:18 Cube CPUs CPU Time State In Processed Out Bytes 11:39:18 ------------------------------------------------------------------------------------------ 11:39:18 FileToRecord 0 00:0.000s FINISHED - - 200 51.2 KB 11:39:18 DatasetWrite 0 00:0.007s FINISHED 200 200 - - 11:39:18 Simple Floe completed in 00:0.829s
FFFFFF F FFFFFF FFFFFF FK FKF KF FK K FF FF KF K oF oF K FF K K KFFFFF FF FK .K KFFFFF FF K K K KF KF K FF FKF FK K oF F FFFFFF FFFFF FFFFFF FF OpenEye Floe Runtime Python: 3.9.7 Platform: macOS-10.15.7-x86_64-i386-64bit Runtime: 1.2.0 OpenEye-OrionPlatform: 4.4.0 CPU Slots: 13 Max Parallel: 11 Eager Start: true 11:39:32 Running: 'Simple Floe' 11:39:32 ================================================================================================================== 11:39:32 Cube CPUs Time State In Processed Out Bytes Group Cycle Wait Type 11:39:32 ------------------------------------------------------------------------------------------------------------------ 11:39:32 FileToRecord 1 0s SCHEDULED - - - - - - - SRC 11:39:32 DatasetWrite 0 0s SCHEDULED - - - - - - - SINK 11:39:33 ================================================================================================================== 11:39:33 Cube CPUs Time State In Processed Out Bytes Group Cycle Wait Type 11:39:33 ------------------------------------------------------------------------------------------------------------------ 11:39:33 FileToRecord 0 0s FINISHED - - 200 51.2K - - - SRC 11:39:33 DatasetWrite 0 0s FINISHED 200 200 - - - - - SINK 11:39:33 Simple Floe completed in 1.5s
The name of the cube.
The number of CPUs the cube is utilizing.
The cumulative processing time across all CPUs for the cube.
The state of the cube. One of
The count of items read by the cube.
The count of items processed by the cube.
The count of items output by the cube.
The number of bytes of data output by the cube.
Only available in new version of the floe runtime
The name of the group, if the cube is in a group.
Only available in new version of the floe runtime
The name of the cycle’s head cube, if the cube is in a cycle.
Only available in new version of the floe runtime
The current wait state of the cube. One of:
-: Not waiting
CubeStart: Waiting on the cube to import and initialize
CubeWrite: Waiting to send data to the downstream cube
CubeRead: Waiting to read data from the upstream cube
Input: Waiting for input to the cube
BufferWrite: Waiting to emit data from the cube to a shared buffer
Parallel: Waiting for the cube to finish parallel work
Initializer: Waiting for the initializer data for the cube
Only available in new version of the floe runtime
The type of the cube. One of:
SRC: Source cube
SINK: Sink cube
COMP: Compute cube
PRL: Parallel cube
PGRP: Parallel group
Serializing a WorkFloe¶
WorkFloe can be serialized to JSON using the Floe Python API.
An example WorkFloe:
from floe.api import SinkCube, WorkFloe, SourceCube my_floe = WorkFloe("my_floe") ifs = SourceCube() ofs = SinkCube() my_floe.add_cubes(ifs, ofs)
Serializing to JSON¶
The example above can be converted into a Python dictionary suitable for serialization:
serialized = my_floe.json()
WorkFloe can be reconstructed from JSON back into a
my_floe = WorkFloe.from_json(serialized)
All files containing cubes must be in your PYTHONPATH, otherwise they cannot be imported.
Cube Execution Order¶
The order of execution of cubes in a
WorkFloe is determined automatically by examining the graph that defines it. Using the Floe
Python API to run WorkFloes (outside of Orion) will, by default, use all available CPUs. One CPU is reserved for coordination, as well as one additional CPU
per parallel cube for work tracking, and the remaining CPUs are used to run cubes.
An implementation of a FBP (Flow Based Programming) system can be designed to operate on streams of data in two ways. The first way is process centric, where communicating sequential processes operate on discrete items as they arrive on a stream and forward the mutated item downstream. The second way is data centric, where a sequence of computations are performed by a single process on each item in the stream, one at a time. For example, suppose two computations are to be performed, A followed by B. In a process centric implementation, A and B would be executed concurrently by two separate processes. A would perform its computation on each item in the stream and then transmit the item to B (possibly over a network connection). Note that this type of system requires that items be serialized to a byte representation by A, then deserialized by B. In a data centric implementation, a single process would execute A, then execute B on each item in the stream, without the need for serialization. In summary, a process centric implementation has the benefit of concurrency, but a data centric implementation doesn’t incur the cost of serialization. Floe allows for both modes of execution at the same time.
By default, each cube is executed in its own process, and items are transmitted between cubes using a network connection.
However, the time required for serialization between cubes can often be longer than the cubes themselves. In such cases,
efficiency can be increased by grouping sequential cubes together to be executed by a single process, without serialization,
in a data centric way. Groups can be used in a
WorkFloe just like any other cube, and can be connected to other cubes or groups.
Cubes can be added to a
CubeGroup by passing a list of cubes into the constructor, and then adding the group
to the WorkFloe:
# example_group_floe.py from my_cubes import PrinterCube from floe.api import WorkFloe, SourceCube, SinkCube, CubeGroup job = WorkFloe("Cube Group Example") ifs = SourceCube() printer = PrinterCube() ofs = SinkCube() # Add cubes to the WorkFloe job.add_cubes(ifs, printer, ofs) # Create a cube group group = CubeGroup(cubes=[ifs, printer, ofs]) # Add the group to the WorkFloe job.add_group(group) ifs.success.connect(printer.intake) printer.success.connect(ofs.intake) if __name__ == "__main__": job.run()
Parallel Cube Groups¶
Parallel cubes can be grouped together just like regular cubes. A parallel cube group behaves just like a parallel cube, but can run multiple parallel cubes at once.
Parallel cubes may be added to parallel cube groups by passing the list of cubes to an constructor and adding the group to the WorkFloe:
# example_parallel_group_floe.py from my_cubes import ParallelPassThrough, ParallelPrinterCube from floe.api import WorkFloe, SourceCube, SinkCube, ParallelCubeGroup job = WorkFloe("parallel_group_example") ifs = SourceCube() passthrough = ParallelPassThrough() printer = ParallelPrinterCube() ofs = SinkCube() # Create a cube group group = ParallelCubeGroup(cubes=[passthrough, printer]) # Add the group to the WorkFloe job.add_group(group) # Add cubes to the WorkFloe job.add_cubes(ifs, passthrough, printer, ofs) ifs.success.connect(passthrough.intake) passthrough.success.connect(printer.intake) printer.success.connect(ofs.intake) if __name__ == "__main__": job.run()
It is often useful to encapsulate a set of connected cubes (a partial or complete WorkFloe) as a cube itself. This type of cube is known as a HyperCube.
HyperCubes are a wrapper around a partial
WorkFloe that can be treated as a cube within another WorkFloe. HyperCubes are useful to create a collection of cubes
that are repeatedly used in a particular configuration. Note that HyperCubes are distinct from CubeGroups. Whereas CubeGroups execute a set of cubes in a single process,
a HyperCube does not. A
WorkFloe consisting of HyperCubes will execute identically to a
WorkFloe consisting of all of the cubes within those HyperCubes (assuming they are connected identically).
The following example demonstrates how to define a partial
WorkFloe and promote ports to act as the HyperCubes ports.
from floe.api import WorkFloe, HyperCube from floe.tests.test_cubes import PrintPassThroughCube # Create WorkFloe job = WorkFloe(name="Example HyperCube") print_cube_1 = PrintPassThroughCube() print_cube_2 = PrintPassThroughCube() print_cube_3 = PrintPassThroughCube() job.add_cubes(print_cube_1, print_cube_2, print_cube_3) # Connect Cubes within WorkFloe print_cube_1.success.connect(print_cube_2.intake) print_cube_2.success.connect(print_cube_3.intake) # Wrap ``WorkFloe`` in a HyperCube HyperPrintCube = HyperCube(job) # Promote Ports to act as the inputs and outputs of the HyperCube HyperPrintCube.promote_port(print_cube_1, "intake") # Able to add a promoted name which will be used in place of the original name HyperPrintCube.promote_port(print_cube_3, "success", promoted_name="pass_success")
The following example demonstrates how to use a HyperCube in a WorkFloe
from floe.api import WorkFloe, SourceCube, SinkCube # Import the variable that is defined as the Workfloe wrapped in a HyperCube from example.hyper_pass import HyperPrintCube # Create WorkFloe job = WorkFloe(name="Test HyperCube") ifs = SourceCube() pass_through_cube = HyperPrintCube() ofs = SinkCube() job.add_cubes(ifs, pass_through_cube, ofs) ifs.success.connect(pass_through_cube.intake) # Use the promoted name of the success port pass_through_cube.pass_success.connect(ofs.intake) if __name__ == "__main__": job.run()
HyperCubes are evaluated at run time, so you must declare a variable to represent your HyperCube.
In the above example this is done with
HyperPrintCube and should be imported as such
HyperCubes can be treated as almost identical to cubes. You define them with a
name and an optional keyword
title. Once instantiated, HyperCubes are added to a
WorkFloe in the same way that Cubes are.
from floe.api import WorkFloe, SourceCube, SinkCube from floe.tests.hyper_cubes.hyper_print_cube import HyperPrintCube job = WorkFloe("Simple HyperCube Floe") ifs = SourceCube() ifs.modify_parameter(ifs.data_in, promoted_name="ifs") # HyperCubes take similar parameters, notably a name and the optional title keyword argument hyper_print = HyperPrintCube() ofs = SinkCube() ofs.modify_parameter(ofs.data_out, promoted_name="ofs") job.add_cubes(ifs, hyper_print, ofs) ifs.success.connect(hyper_print.intake) hyper_print.success.connect(ofs.intake) if __name__ == "__main__": job.run()
Support for using HyperCubes in the Orion UI is a work in progress, and not yet supported.
The execution of a
WorkFloe can be separated into stages, where each stage runs to completion before the next begins. This is useful in cases
where data needs to be carried forward as a single unit instead of being streamed. Propagation of data is accomplished by connecting output
parameters of one stage to input parameters of a subsequent stage. A stage may be any valid
WorkFloe that does not have multiple stages itself.
Support for running multistage WorkFloes in Orion is a work in progress, and not yet supported.
The following example demonstrates how to define and connect stages in a WorkFloe.
import os import sys import tempfile from floe.api import WorkFloe, SourceCube, SinkCube from floe.tests.test_cubes import TTLSetter, TTLPrinter # Define the multistage workfloe job = WorkFloe("multistage workfloe") # Stage 1 stage1 = WorkFloe("stage1") ifs = SourceCube() ifs.modify_parameter(ifs.data_in, promoted_name="ifs") ttl = TTLSetter() ofs = SinkCube() ofs.modify_parameter(ofs.data_out, promoted_name="ofs") stage1.add_cubes(ifs, ttl, ofs) ifs.success.connect(ttl.intake) ttl.success.connect(ofs.intake) # Stage 2 stage2 = WorkFloe("stage2") ifs2 = SourceCube() ofs2 = SinkCube() ofs2.modify_parameter(ofs2.data_out, promoted_name="ofs2") ifs3 = SourceCube() ttl_printer = TTLPrinter() stage2.add_cubes(ifs2, ofs2, ttl_printer, ifs3) ifs2.success.connect(ofs2.intake) ifs3.success.connect(ttl_printer.intake) # Add stages job.add_stage(stage1) job.add_stage(stage2) # Connect parameters across stages ofs.connect_parameter("ofs", ifs2, "data_in") ofs.connect_parameter("ofs", ifs3, "data_in") if __name__ == "__main__": job.run()
Many problems have solutions which are cyclic by nature, such as optimization. Floe comes with support for running WorkFloes containing cycles, with some caveats. Cycles should be used judiciously, as they can easily lead to deadlock or livelock.
When using cycles in Floe, the following caveats apply:
WorkFloemay be defined by an arbitrary number of cubes, the number of cubes in a single cycle is limited to the number of CPUs available for scheduling, as all cubes in a cycle must run concurrently.
The use of CubeGroups in cycles is not yet supported.
There should always be at least one tested data path out of the cycle.
Floe does not provide a mechanism for a cycle invariant. If a you construct an infinite loop, it will run forever. See the previous bullet.