WorkFloes

Overview

Floe allows people to write reusable flow-based programs, called WorkFloes. A WorkFloe is defined by a directed graph where each graph vertex is called a cube. Each cube may have one or more ports which are used for connecting cubes together. When a WorkFloe is executed, cubes are executed concurrently, and data is streamed between the cubes. Technical details such as cube execution order, cube launching, networking, and buffering are handled automatically. The following documentation demonstrates how to get started writing WorkFloes in Python.

WorkFloes consist of connected cubes, where cubes are typed based on their role in the WorkFloe. Source cubes put data into the WorkFloe, sink cubes store data externally, and compute cubes operate on discrete data on a stream within the WorkFloe. The individual pieces of data on a stream are called work items. Work items may be anything that can be serialized to bytes. See serialization.

Because a WorkFloe is defined by a graph, rather than code, a WorkFloe can be expressed using only its graph. The graph itself can be encoded into structured JSON for portability. WorkFloes can also be written using the Python Floe API, which is discussed in this document.

Writing a WorkFloe in Python

To begin, we create a new WorkFloe object:

from floe.api import WorkFloe

my_floe = WorkFloe("my_floe")

The WorkFloe constructor takes one required argument, the name of the WorkFloe. The name should consist of letters, numbers, and the character “_” (in other words, the name should be a valid Python variable identifier).

You may also provide a human-readable title and description:

my_floe = WorkFloe("my_floe", title="An example WorkFloe", description="...a multiline description may be used here")

You may also provide a brief <= 280 character overview, for display on the Floe page in the Orion UI:

my_floe = WorkFloe("my_floe", title="An example WorkFloe", brief="...a short overview may be used here", description="...a multiline description may be used here")

If brief is not provided, then the UI will show some of description.

Advanced users may specify the amount of memory to be passed to docker’s --shm_size parameter. This parameter determines the amount of RAM to be made available to /dev/shm within the container. Such frameworks as OpenMPI often make use of this shared memory.

This parameter may be set on the General tab of the Cube Parameters sheet when launching jobs in the Orion UI, or defaulted as a parameter_overrides option when defining cubes programmatically:

parameter_overrides = {
    "shared_memory_mb": {"default": 1024},
}

This parameter is defaulted to 64 MiB, docker’s default.

Warning

If you oversize /dev/shm the machine will deadlock since the OOM handler will not be able to free that memory. See https://docs.kernel.org/filesystems/tmpfs.html for more information.

Advanced users may utilize the pids_per_cpu_limit cube parameter in Orion to specify the maximum number of pids (processes/threads) per CPU within a cube, defaulting to 32.

In order to determine the max number of pids available to the user the pids_per_cpu_limit is multiplied by the number of allocated CPUs. The number of allocated CPUs will be at least equal to the value specified by the cpu_count parameter. Additionally, Orion then adds three to the specified value to account for built-in processes, regardless of the number of CPUs.

Therefore, the total pids-limit is equal to (pids_per_cpu_limit * number of allocated CPUs) + 3.

Advanced users may utilize the max_backlog_wait cube parameter in Orion to specify the maximum number of seconds a cube will be backlogged on a group before being re-evaluated.

This parameter is defaulted to 600 seconds.

Note

The total pids-limit value for a cube is stored in the ORION_NUM_PROCS environment variable.

The pids_per_cpu_limit may be set on the System tab of the Cube Parameters sheet when launching jobs in the Orion UI, or defaulted as a parameter_overrides option when defining cubes programmatically. The following parameter_overrides examples will both allot the cube 48 usable threads/processes:

parameter_overrides = {
    "pids_per_cpu_limit": {"default": 48},
}

or

parameter_overrides = {
    "pids_per_cpu_limit": {"default": 24},
    "cpu_count": {"default": 2},
}

Warning

There is no upper limit for the size of pids_per_cpu_limit, however large values may compromise performance or even cause failures due to excessive fork calls

You may also add one or more classifications, to make it easier to find on the Floe page in the Orion UI. Snowball includes many predefined classifications or you may create your own custom classification:

from snowball import floeclass
my_floe.classification.append(floeclass.task_cheminfo)

from pathlib import PurePath
my_floe.classification.append(PurePath('Company X/Floe category N/Floe subcategory M'))

Each label (that is, “Floe category N”) in the path must be <= 85 characters, but there is no limit to the depth of the path. We recommend not going deeper than 3–5. We also recommend not having too many categories/subcategories in view at any given level.

For more information about ready-made classification values you can import and use, see the relevant section in the Snowball documentation.

Adding cubes to a WorkFloe

A WorkFloe requires cubes to actually do something. Cubes can be added to the WorkFloe as shown in this example:

from floe.api import SinkCube, SourceCube
ifs = SourceCube()
ofs = SinkCube()
my_floe.add_cubes(ifs, ofs)

Connecting cubes together

Connections are made between cubes using the connect() method of each cube’s port. Connections must always be made from the source cube to the destination cube.

Building on the previous example, at this point, my_floe consists of two cubes; ifs and ofs. However, there are no connections yet.

Connections can be made using the source and destination ports:

ifs.success.connect(ofs.intake)

Note

Cubes must always be added to an instance of a WorkFloe before connections can be made.

Now my_floe defines a valid WorkFloe.

Running a WorkFloe

The Floe Python API allows WorkFloes to be executed within the Orion platform, and on developer’s own machines. A WorkFloe can be executed from Python code, or JSON. For development, it is convenient to run a WorkFloe from Python as shown in the following example.

Example:

# example_floe.py
from my_cubes import MySourceCube, MySinkCube, OE2DPSACube

from floe.api import WorkFloe

job = WorkFloe("Calculate 2D PSA Floe")
ifs = MySourceCube()
psa = OE2DPSACube(title="PSA Calculator")
ofs = MySinkCube()

job.add_cubes(ifs, psa, ofs)

ifs.success.connect(psa.intake)
psa.success.connect(ofs.intake)

if __name__ == "__main__":
    job.run()

Now example.py can be run as a Python script:

$ floe run example_floe.py

or:

$ python example_floe.py

For local development, invoking WorkFloe floe run example_floe.py --help will print parameters grouped and ordered as in Orion UI.

Note

You must always include the if __name__ == "__main__" statement as a condition to call run(), or Floe will not be able to import your Floe without running it.

A new local runtime has been added and can be swapped out with the existing local runtime without changes to internal WorkFloe code. To use the new runtime, set the environment variable FLOERUNTIME=v2:

$ FLOERUNTIME=v2 floe run example_floe.py

or:

$ FLOERUNTIME=v2 python example_floe.py

The V2 runtime’s output has new fields for cube types, groups, cycles, and wait types as well as an experimental uneager cube start option. The uneager option makes the V2 runtime delay starting cubes until they have input. Cubes with no input ports do not have a delayed start. Uneager cube start can be used by setting the environment variable EAGER=false:

$ FLOERUNTIME=v2 EAGER=false floe run example_floe.py

or:

$ FLOERUNTIME=v2 EAGER=false python example_floe.py

Output Columns

When running a WorkFloe locally, information about each cube will be printed periodically.

Example:

     FFFFFF   F         FFFFFF     FFFFFF                FK FKF KF FK
     K       FF       FF      KF   K                     oF oF
     K       FF       K        K   KFFFFF             FF FK .K
     KFFFFF  FF       K        K   K               KF KF
     K       FF       FKF    FK    K           oF
     F        FFFFFF    FFFFF      FFFFFF      FF

    OpenEye Floe Runtime

 Python: CPython    3.9.7      Platform: macOS-10.15.7-x86_64-i386-64bit
 OpenEye-OrionPlatform:   4.4.0   CPUs: 12
11:39:17 Running: Simple Floe
11:39:17 ==========================================================================================
11:39:17 Cube         CPUs   CPU Time   State    In          Processed   Out         Bytes
11:39:17 ------------------------------------------------------------------------------------------
11:39:17 FileToRecord 1      -        SCHEDULED  -           -           -           -
11:39:17 DatasetWrite 1      -        SCHEDULED  -           -           -           -
11:39:18 ==========================================================================================
11:39:18 Cube         CPUs   CPU Time   State    In          Processed   Out         Bytes
11:39:18 ------------------------------------------------------------------------------------------
11:39:18 FileToRecord 0      00:0.000s FINISHED   -           -           200         51.2 KB
11:39:18 DatasetWrite 0      00:0.007s FINISHED   200         200         -           -
11:39:18 Simple Floe completed in 00:0.829s

Or with FLOERUNTIME=v2:

        FFFFFF   F         FFFFFF     FFFFFF                FK FKF KF FK
        K       FF       FF      KF   K                     oF oF
        K       FF       K        K   KFFFFF             FF FK .K
        KFFFFF  FF       K        K   K               KF KF
        K       FF       FKF    FK    K           oF
        F        FFFFFF    FFFFF      FFFFFF      FF

        OpenEye Floe Runtime

 Python: 3.9.7        Platform: macOS-10.15.7-x86_64-i386-64bit
 Runtime: 1.2.0       OpenEye-OrionPlatform: 4.4.0
 CPU Slots: 13        Max Parallel: 11
 Eager Start: true

11:39:32 Running: 'Simple Floe'
11:39:32 ==================================================================================================================
11:39:32 Cube          CPUs      Time     State        In Processed       Out   Bytes    Group    Cycle         Wait   Type
11:39:32 ------------------------------------------------------------------------------------------------------------------
11:39:32 FileToRecord     1        0s SCHEDULED         -         -         -       -        -        -            -    SRC
11:39:32 DatasetWrite     0        0s SCHEDULED         -         -         -       -        -        -            -   SINK
11:39:33 ==================================================================================================================
11:39:33 Cube          CPUs      Time     State        In Processed       Out   Bytes    Group    Cycle         Wait   Type
11:39:33 ------------------------------------------------------------------------------------------------------------------
11:39:33 FileToRecord     0        0s  FINISHED         -         -       200   51.2K        -        -            -    SRC
11:39:33 DatasetWrite     0        0s  FINISHED       200       200         -       -        -        -            -   SINK
11:39:33 Simple Floe completed in 1.5s

Cube

The name of the cube.

CPUs

The number of CPUs the cube is utilizing.

CPU Time

The cumulative processing time across all CPUs for the cube.

State

The state of the cube. One of PENDING, SCHEDULED, INIT, RUNNING, THROTTLING, FLUSHING, STOPPING, or FINISHED.

In

The count of items read by the cube.

Processed

The count of items processed by the cube.

Out

The count of items output by the cube.

Bytes

The number of bytes of data output by the cube.

Group

Only available in new version of the floe runtime

The name of the group, if the cube is in a group.

Cycle

Only available in new version of the floe runtime

The name of the cycle’s head cube, if the cube is in a cycle.

Wait

Only available in new version of the floe runtime

The current wait state of the cube. One of:

  • -: Not waiting

  • CubeStart: Waiting on the cube to import and initialize

  • CubeWrite: Waiting to send data to the downstream cube

  • CubeRead: Waiting to read data from the upstream cube

  • Input: Waiting for input to the cube

  • BufferWrite: Waiting to emit data from the cube to a shared buffer

  • Parallel: Waiting for the cube to finish parallel work

  • Initializer: Waiting for the initializer data for the cube

Type

Only available in new version of the floe runtime

The type of the cube. One of:

  • SRC: Source cube

  • SINK: Sink cube

  • COMP: Compute cube

  • PRL: Parallel cube

  • PGRP: Parallel group

Serializing a WorkFloe

A WorkFloe can be serialized to JSON using the Floe Python API.

An example WorkFloe:

from floe.api import SinkCube, WorkFloe, SourceCube
my_floe = WorkFloe("my_floe")
ifs = SourceCube()
ofs = SinkCube()
my_floe.add_cubes(ifs, ofs)

Serializing to JSON

The example above can be converted into a Python dictionary suitable for serialization:

serialized = my_floe.json()

The entire WorkFloe can be reconstructed from JSON back into a WorkFloe object:

my_floe = WorkFloe.from_json(serialized)

Note

All files containing cubes must be in your PYTHONPATH, otherwise they cannot be imported.

Cube Execution Order

The order of execution of cubes in a WorkFloe is determined automatically by examining the graph that defines it. Using the Floe Python API to run WorkFloes (outside of Orion) will, by default, use all available CPUs. One CPU is reserved for coordination, as well as one additional CPU per parallel cube for work tracking, and the remaining CPUs are used to run cubes.

Cube Groups

An implementation of a FBP (Flow Based Programming) system can be designed to operate on streams of data in two ways. The first way is process centric, where communicating sequential processes operate on discrete items as they arrive on a stream and forward the mutated item downstream. The second way is data centric, where a sequence of computations are performed by a single process on each item in the stream, one at a time. For example, suppose two computations are to be performed, A followed by B. In a process centric implementation, A and B would be executed concurrently by two separate processes. A would perform its computation on each item in the stream and then transmit the item to B (possibly over a network connection). Note that this type of system requires that items be serialized to a byte representation by A, then deserialized by B. In a data centric implementation, a single process would execute A, then execute B on each item in the stream, without the need for serialization. In summary, a process centric implementation has the benefit of concurrency, but a data centric implementation doesn’t incur the cost of serialization. Floe allows for both modes of execution at the same time.

By default, each cube is executed in its own process, and items are transmitted between cubes using a network connection. However, the time required for serialization between cubes can often be longer than the cubes themselves. In such cases, efficiency can be increased by grouping sequential cubes together to be executed by a single process, without serialization, in a data centric way. Groups can be used in a WorkFloe just like any other cube, and can be connected to other cubes or groups.

Cubes can be added to a CubeGroup by passing a list of cubes into the constructor, and then adding the group to the WorkFloe:

# example_group_floe.py
from my_cubes import PrinterCube
from floe.api import WorkFloe, SourceCube, SinkCube, CubeGroup

job = WorkFloe("Cube Group Example")
ifs = SourceCube()
printer = PrinterCube()
ofs = SinkCube()

# Add cubes to the WorkFloe
job.add_cubes(ifs, printer, ofs)

# Create a cube group
group = CubeGroup(cubes=[ifs, printer, ofs])

# Add the group to the WorkFloe
job.add_group(group)

ifs.success.connect(printer.intake)
printer.success.connect(ofs.intake)

if __name__ == "__main__":
    job.run()

Parallel Cube Groups

Parallel cubes can be grouped together just like regular cubes. A parallel cube group behaves just like a parallel cube, but can run multiple parallel cubes at once.

Parallel cubes may be added to parallel cube groups by passing the list of cubes to an constructor and adding the group to the WorkFloe:

# example_parallel_group_floe.py
from my_cubes import ParallelPassThrough, ParallelPrinterCube
from floe.api import WorkFloe, SourceCube, SinkCube, ParallelCubeGroup

job = WorkFloe("parallel_group_example")
ifs = SourceCube()
passthrough = ParallelPassThrough()
printer = ParallelPrinterCube()
ofs = SinkCube()

# Create a cube group
group = ParallelCubeGroup(cubes=[passthrough, printer])

# Add the group to the WorkFloe
job.add_group(group)

# Add cubes to the WorkFloe
job.add_cubes(ifs, passthrough, printer, ofs)

ifs.success.connect(passthrough.intake)
passthrough.success.connect(printer.intake)
printer.success.connect(ofs.intake)


if __name__ == "__main__":
    job.run()

Nested WorkFloes

It is often useful to encapsulate a set of connected cubes (a partial or complete WorkFloe) as a cube itself. This type of cube is known as a HyperCube.

Defining HyperCubes

HyperCubes are a wrapper around a partial WorkFloe that can be treated as a cube within another WorkFloe. HyperCubes are useful to create a collection of cubes that are repeatedly used in a particular configuration. Note that HyperCubes are distinct from CubeGroups. Whereas CubeGroups execute a set of cubes in a single process, a HyperCube does not. A WorkFloe consisting of HyperCubes will execute identically to a WorkFloe consisting of all of the cubes within those HyperCubes (assuming they are connected identically).

The following example demonstrates how to define a partial WorkFloe and promote ports to act as the HyperCubes ports.

from floe.api import WorkFloe, HyperCube
from floe.tests.test_cubes import PrintPassThroughCube

# Create WorkFloe
job = WorkFloe(name="Example HyperCube")

print_cube_1 = PrintPassThroughCube()
print_cube_2 = PrintPassThroughCube()
print_cube_3 = PrintPassThroughCube()

job.add_cubes(print_cube_1, print_cube_2, print_cube_3)

# Connect Cubes within WorkFloe
print_cube_1.success.connect(print_cube_2.intake)
print_cube_2.success.connect(print_cube_3.intake)

# Wrap ``WorkFloe`` in a HyperCube
HyperPrintCube = HyperCube(job)

# Promote Ports to act as the inputs and outputs of the HyperCube
HyperPrintCube.promote_port(print_cube_1, "intake")
# Able to add a promoted name which will be used in place of the original name
HyperPrintCube.promote_port(print_cube_3, "success", promoted_name="pass_success")

The following example demonstrates how to use a HyperCube in a WorkFloe

from floe.api import WorkFloe, SourceCube, SinkCube
# Import the variable that is defined as the Workfloe wrapped in a HyperCube
from example.hyper_pass import HyperPrintCube

# Create WorkFloe
job = WorkFloe(name="Test HyperCube")

ifs = SourceCube()
pass_through_cube = HyperPrintCube()
ofs = SinkCube()

job.add_cubes(ifs, pass_through_cube, ofs)

ifs.success.connect(pass_through_cube.intake)
# Use the promoted name of the success port
pass_through_cube.pass_success.connect(ofs.intake)

if __name__ == "__main__":
    job.run()

Note

HyperCubes are evaluated at run time, so you must declare a variable to represent your HyperCube. In the above example this is done with HyperPrintCube and should be imported as such

Using HyperCubes

HyperCubes can be treated as almost identical to cubes. You define them with a name and an optional keyword argument title. Once instantiated, HyperCubes are added to a WorkFloe in the same way that Cubes are.

from floe.api import WorkFloe, SourceCube, SinkCube

from floe.tests.hyper_cubes.hyper_print_cube import HyperPrintCube

job = WorkFloe("Simple HyperCube Floe")

ifs = SourceCube()
ifs.modify_parameter(ifs.data_in, promoted_name="ifs")
# HyperCubes take similar parameters, notably a name and the optional title keyword argument
hyper_print = HyperPrintCube()
ofs = SinkCube()
ofs.modify_parameter(ofs.data_out, promoted_name="ofs")

job.add_cubes(ifs, hyper_print, ofs)

ifs.success.connect(hyper_print.intake)
hyper_print.success.connect(ofs.intake)

if __name__ == "__main__":
    job.run()

Note

Support for using HyperCubes in the Orion UI is a work in progress, and not yet supported.

Multistage WorkFloes

The execution of a WorkFloe can be separated into stages, where each stage runs to completion before the next begins. This is useful in cases where data needs to be carried forward as a single unit instead of being streamed. Propagation of data is accomplished by connecting output parameters of one stage to input parameters of a subsequent stage. A stage may be any valid WorkFloe that does not have multiple stages itself.

Note

Support for running multistage WorkFloes in Orion is a work in progress, and not yet supported.

The following example demonstrates how to define and connect stages in a WorkFloe.

import os
import sys
import tempfile

from floe.api import WorkFloe, SourceCube, SinkCube
from floe.tests.test_cubes import TTLSetter, TTLPrinter


# Define the multistage workfloe
job = WorkFloe("multistage workfloe")


# Stage 1
stage1 = WorkFloe("stage1")
ifs = SourceCube()
ifs.modify_parameter(ifs.data_in, promoted_name="ifs")
ttl = TTLSetter()
ofs = SinkCube()
ofs.modify_parameter(ofs.data_out, promoted_name="ofs")
stage1.add_cubes(ifs, ttl, ofs)
ifs.success.connect(ttl.intake)
ttl.success.connect(ofs.intake)


# Stage 2
stage2 = WorkFloe("stage2")
ifs2 = SourceCube()
ofs2 = SinkCube()
ofs2.modify_parameter(ofs2.data_out, promoted_name="ofs2")
ifs3 = SourceCube()
ttl_printer = TTLPrinter()
stage2.add_cubes(ifs2, ofs2, ttl_printer, ifs3)
ifs2.success.connect(ofs2.intake)
ifs3.success.connect(ttl_printer.intake)


# Add stages
job.add_stage(stage1)
job.add_stage(stage2)


# Connect parameters across stages
ofs.connect_parameter("ofs", ifs2, "data_in")
ofs.connect_parameter("ofs", ifs3, "data_in")


if __name__ == "__main__":
    job.run()

Cyclic WorkFloes

Many problems have solutions which are cyclic by nature, such as optimization. Floe comes with support for running WorkFloes containing cycles, with some caveats. Cycles should be used judiciously, as they can easily lead to deadlock or livelock. Cyclic CubeGroups are supported locally and in orion with the only restriction being that a CubeGroup may only be part of one cycle at a time.

When designing cycles in Floe, keep in mind:

  • Although a WorkFloe may be defined by an arbitrary number of cubes, the number of cubes in a single cycle is limited to the number of CPUs available for scheduling, as all cubes in a cycle must run concurrently.

  • There should always be at least one tested data path out of the cycle.

  • Floe does not provide a mechanism for a cycle invariant. If a you construct an infinite loop, it will run forever. See the previous bullet.

  • Many different methods exist for how to exit a cycle, usage will depend on the scenario. If the cycle should be exited from a parallel cube, make sure to store the determining information for exit on each piece of data rather than the cube itself because parallel cube attributes are not synced among each instance.

The following example demonstrates one method to exit a cycle using a Time To Live (TTL) field stored on each piece of data.

With cube definitions:

from floe.api import SourceCube, ComputeCube, ParallelMixin, JsonInputPort, JsonOutputPort
from floe.api.parameters import IntegerParameter

class GenerateJSON(SourceCube):
    output = JsonOutputPort("output")

    def __iter__(self):
        for i in range(10):
            yield {"data": "Dummy data", "id": i}

class JSONBase(ComputeCube):
    intake = JsonInputPort("intake")
    output = JsonOutputPort("output")


class JSONPrintPassThrough(JSONBase):
    def process(self, data, port):
        print(self, data)
        self.output.emit(data)


class JSONTTLSetter(JSONBase):
    ttl = IntegerParameter(
        "ttl", default=2, help_text="The initial ttl value", title="Time to live"
    )

    def process(self, data, port):
        data["ttl"] = self.args.ttl
        self.emit(data)


class ParallelJSONTTLRouter(ParallelMixin, JSONBase):
    ttl_threshold = IntegerParameter(
        "ttl_threshold",
        default=0,
        help_text="Threshold for breaking a cycle",
        title="Time to live Threshold",
    )
    cycle = JsonOutputPort("cycle")

    def process(self, data, port):
        ttl = data.get("ttl", self.args.ttl_threshold)
        if ttl <= self.args.ttl_threshold:
            self.output.emit(data)
        else:
            data["ttl"] = ttl - 1
            self.cycle.emit(data)

Each piece of data will cycle three times before exiting:

from floe.api import WorkFloe
from sample_cubes import GenerateJSON, JSONTTLSetter, JSONPrintPassThrough, ParallelJSONTTLRouter

# c denotes cyclic output, e denotes ending output, P denotes parallel cube
# Number below each cube marks how many times each piece of data will visit
# ┌──┐  ┌──┐    ┌──┐  ┌──┐      ┌──┐
# │ A├─►│ B├───►│ C├─►│DP├─┬─e─►│ F│
# │1x│  │1x│ ▲  │4x│  │4x│ │    │1x│
# └──┘  └──┘ │  └──┘  └──┘ │    └──┘
#            │             c
#            │             │
#            │     ┌──┐    │
#            └─────┤ E│◄───┘
#                  │3x│
#                  └──┘
floe = WorkFloe("Cyclic Floe")

cubeA = GenerateJSON("Cube A")
cubeB = JSONTTLSetter("Cube B")
cubeC = JSONPrintPassThrough("Cube C")
cubeD = ParallelJSONTTLRouter("Cube D")
cubeE = JSONPrintPassThrough("Cube E")
cubeF = JSONPrintPassThrough("Cube F")

# Add cubes and modify parameters
floe.add_cubes(cubeA, cubeB, cubeC, cubeD, cubeE, cubeF)
# Modifying the ttl parameter determines how many iterations of the cycle will occur
cubeB.modify_parameter(cubeB.ttl, default=3)

# Connect cubes
cubeA.output.connect(cubeB.intake)
cubeB.output.connect(cubeC.intake)
cubeC.output.connect(cubeD.intake)
cubeD.cycle.connect(cubeE.intake)
cubeE.output.connect(cubeC.intake)
cubeD.output.connect(cubeF.intake)

if __name__ == "__main__":
    floe.run()