WorkFloes

Overview

Floe allows people to write reusable flow-based programs, called WorkFloes. A WorkFloe is defined by a directed graph where each graph vertex is called a cube. Each cube may have one or more ports which are used for connecting cubes together. When a WorkFloe is executed, cubes are executed concurrently, and data is streamed between the cubes. Technical details such as cube execution order, cube launching, networking, and buffering are handled automatically. The following documentation demonstrates how to get started writing WorkFloes in Python.

WorkFloes consist of connected cubes, where cubes are typed based on their role in the WorkFloe. Source cubes put data into the WorkFloe, sink cubes store data externally, and compute cubes operate on discrete data on a stream within the WorkFloe. The individual pieces of data on a stream are called work items. Work items may be anything that can be serialized to bytes. See serialization.

Because a WorkFloe is defined by a graph, rather than code, a WorkFloe can be expressed using only its graph. The graph itself can be encoded into structured JSON for portability. WorkFloes can also be written using the Python Floe API, which is discussed in this document.

Writing a WorkFloe in Python

To begin, we create a new WorkFloe object:

from floe.api import WorkFloe

my_floe = WorkFloe("my_floe")

The WorkFloe constructor takes one required argument, the name of the WorkFloe. The name should consist of letters, numbers, and the character “_” (in other words, the name should be a valid Python variable identifier).

You may also provide a human-readable title and description:

my_floe = WorkFloe("my_floe", title="An example WorkFloe", description="...a multiline description may be used here")

Adding cubes to a WorkFloe

A WorkFloe requires cubes to actually do something. Cubes can be added to the WorkFloe as shown in this example:

from floe.api import SourceCube, SinkCube
ifs = SourceCube()
ofs = SinkCube()
my_floe.add_cubes(ifs, ofs)

Connecting cubes together

Connections are made between cubes using the connect() method of each cube’s port. Connections must always be made from the source cube to the destination cube.

Building on the previous example, at this point, my_floe consists of two cubes; ifs and ofs. However, there are no connections yet.

Connections can be made using the source and destination ports:

ifs.success.connect(ofs.intake)

Note

Cubes must always be added to an instance of a WorkFloe before connections can be made.

Now my_floe defines a valid WorkFloe.

Running a WorkFloe

The Floe Python API allows WorkFloes to be executed within the Orion platform, and on developer’s own machines. A WorkFloe can be executed from Python code, or JSON. For development, it is convenient to run a WorkFloe from Python as shown in the following example.

Example:

# example_floe.py
from my_cubes import OE2DPSACube
from floe.api import WorkFloe, SourceCube, SinkCube

job = WorkFloe("Calcluate 2D PSA Floe")
ifs = SourceCube()
psa = OE2DPSACube(title="PSA Calculator")
ofs = SinkCube()

job.add_cubes(ifs, psa, ofs)

ifs.success.connect(psa.intake)
psa.success.connect(ofs.intake)

if __name__ == "__main__":
    job.run()

Now example.py can be run as a Python script:

$ python example_floe.py

Note

You must always include the if __name__ == "__main__" statement as a condition to call run(), or Floe will not be able to import your Floe without running it.

Serializing a WorkFloe

A WorkFloe can be serialized to JSON using the Floe Python API.

An example WorkFloe:

from floe.api import WorkFloe, SourceCube, SinkCube
my_floe = WorkFloe("my_floe")
ifs = SourceCube()
ofs = SinkCube()
my_floe.add_cubes(ifs, ofs)

Serializing to JSON

The example above can be converted into a Python dictionary suitable for serialization:

serialized = my_floe.json()

The entire WorkFloe can be reconstructed from JSON back into a WorkFloe object:

my_floe = WorkFloe.from_json(serialized)

Note

All files containing cubes must be in your PYTHONPATH, otherwise they cannot be imported.

Cube Execution Order

The order of execution of cubes in a WorkFloe is determined automatically by examining the graph that defines it. Using the Floe Python API to run WorkFloes (outside of Orion) will, by default, use all available CPUs. One CPU is reserved for coordination, as well as one additional CPU per parallel cube for work tracking, and the remaining CPUs are used to run cubes.

Cube Groups

An implementation of a FBP (Flow Based Programming) system can be designed to operate on streams of data in two ways. The first way is process centric, where communicating sequential processes operate on discrete items as they arrive on a stream and forward the mutated item downstream. The second way is data centric, where a sequence of computations are performed by a single process on each item in the stream, one at a time. For example, suppose two computations are to be performed, A followed by B. In a process centric implementation, A and B would be executed concurrently by two separate processes. A would perform its computation on each item in the stream and then transmit the item to B (possibly over a network connection). Note that this type of system requires that items be serialized to a byte representation by A, then deserialized by B. In a data centric implementation, a single process would execute A, then execute B on each item in the stream, without the need for serialization. In summary, a process centric implementation has the benefit of concurrency, but a data centric implementation doesn’t incur the cost of serialization. Floe allows for both modes of execution at the same time.

By default, each cube is executed in its own process, and items are transmitted between cubes using a network connection. However, the time required for serialization between cubes can often be longer than the cubes themselves. In such cases, efficiency can be increased by grouping sequential cubes together to be executed by a single process, without serialization, in a data centric way. Groups can be used in a WorkFloe just like any other cube, and can be connected to other cubes or groups.

Cubes can be added to a CubeGroup by passing a list of cubes into the constructor, and then adding the group to the WorkFloe:

# example_group_floe.py
from my_cubes import PrinterCube
from floe.api import WorkFloe, SourceCube, SinkCube, CubeGroup

job = WorkFloe("Cube Group Example")
ifs = SourceCube()
printer = PrinterCube()
ofs = SinkCube()

# Add cubes to the WorkFloe
job.add_cubes(ifs, printer, ofs)

# Create a cube group
group = CubeGroup(cubes=[ifs, printer, ofs])

# Add the group to the WorkFloe
job.add_group(group)

ifs.success.connect(printer.intake)
printer.success.connect(ofs.intake)

if __name__ == "__main__":
    job.run()

Parallel Cube Groups

Parallel cubes can be grouped together just like regular cubes. A parallel cube group behaves just like a parallel cube, but can run multiple parallel cubes at once.

Parallel cubes may be added to parallel cube groups by passing the list of cubes to an constructor and adding the group to the WorkFloe:

# example_parallel_group_floe.py
from my_cubes import ParallelPassThrough, ParallelPrinterCube
from floe.api import WorkFloe, SourceCube, SinkCube, ParallelCubeGroup

job = WorkFloe("parallel_group_example")
ifs = SourceCube()
passthrough = ParallelPassThrough()
printer = ParallelPrinterCube()
ofs = SinkCube()

# Create a cube group
group = ParallelCubeGroup(cubes=[passthrough, printer])

# Add the group to the WorkFloe
job.add_group(group)

# Add cubes to the WorkFloe
job.add_cubes(ifs, passthrough, printer, ofs)

ifs.success.connect(passthrough.intake)
passthrough.success.connect(printer.intake)
printer.success.connect(ofs.intake)


if __name__ == "__main__":
    job.run()

Nested WorkFloes

It is often useful to encapsulate a set of connected cubes (a partial or complete WorkFloe) as a cube itself. This type of cube is known as a HyperCube.

Defining HyperCubes

HyperCubes are a wrapper around a partial WorkFloe that can be treated as a cube within another WorkFloe. HyperCubes are useful to create a collection of cubes that are repeatedly used in a particular configuration. Note that HyperCubes are distinct from CubeGroups. Whereas CubeGroups execute a set of cubes in a single process, a HyperCube does not. A WorkFloe consisting of HyperCubes will execute identically to a WorkFloe consisting of all of the cubes within those HyperCubes (assuming they are connected identically).

The following example demonstrates how to define a partial WorkFloe and promote ports to act as the HyperCubes ports.

from floe.api import WorkFloe, HyperCube
from floe.tests.test_cubes import PrintPassThroughCube

# Create WorkFloe
job = WorkFloe(name="Example HyperCube")

print_cube_1 = PrintPassThroughCube()
print_cube_2 = PrintPassThroughCube()
print_cube_3 = PrintPassThroughCube()

job.add_cubes(print_cube_1, print_cube_2, print_cube_3)

# Connect Cubes within WorkFloe
print_cube_1.success.connect(print_cube_2.intake)
print_cube_2.success.connect(print_cube_3.intake)

# Wrap ``WorkFloe`` in a HyperCube
HyperPrintCube = HyperCube(job)

# Promote Ports to act as the inputs and outputs of the HyperCube
HyperPrintCube.promote_port(print_cube_1, "intake")
# Able to add a promoted name which will be used in place of the original name
HyperPrintCube.promote_port(print_cube_3, "success", promoted_name="pass_success")

The following example demonstrates how to use a HyperCube in a WorkFloe

from floe.api import WorkFloe, SourceCube, SinkCube
# Import the variable that is defined as the Workfloe wrapped in a HyperCube
from example.hyper_pass import HyperPrintCube

# Create WorkFloe
job = WorkFloe(name="Test HyperCube")

ifs = SourceCube()
pass_through_cube = HyperPrintCube()
ofs = SinkCube()

job.add_cubes(ifs, pass_through_cube, ofs)

ifs.success.connect(pass_through_cube.intake)
# Use the promoted name of the success port
pass_through_cube.pass_success.connect(ofs.intake)

if __name__ == "__main__":
    job.run()

Note

HyperCubes are evaluated at run time, so you must declare a variable to represent your HyperCube. In the above example this is done with HyperPrintCube and should be imported as such

Using HyperCubes

HyperCubes can be treated as almost identical to cubes. You define them with a name and an optional keyword argument title. Once instantiated, HyperCubes are added to a WorkFloe in the same way that Cubes are.

from floe.api import WorkFloe, SourceCube, SinkCube

from floe.tests.hyper_cubes.hyper_print_cube import HyperPrintCube

job = WorkFloe("Simple HyperCube Floe")

ifs = SourceCube()
ifs.modify_parameter(ifs.data_in, promoted_name="ifs")
# HyperCubes take similar parameters, notably a name and the optional title keyword argument
hyper_print = HyperPrintCube()
ofs = SinkCube()
ofs.modify_parameter(ofs.data_out, promoted_name="ofs")

job.add_cubes(ifs, hyper_print, ofs)

ifs.success.connect(hyper_print.intake)
hyper_print.success.connect(ofs.intake)

if __name__ == "__main__":
    job.run()

Note

Support for using HyperCubes in the Orion UI is a work in progress, and not yet supported.

Multi-stage WorkFloes

The execution of a WorkFloe can be separated into stages, where each stage runs to completion before the next begins. This is useful in cases where data needs to be carried forward as a single unit instead of being streamed. Propagation of data is accomplished by connecting output parameters of one stage to input parameters of a subsequent stage. A stage may be any valid WorkFloe that does not have multiple stages itself.

Note

Support for running multi-stage WorkFloes in Orion is a work in progress, and not yet supported.

The following example demonstrates how to define and connect stages in a WorkFloe.

import os
import sys
import tempfile

from floe.api import WorkFloe, SourceCube, SinkCube
from floe.tests.test_cubes import TTLSetter, TTLPrinter


# Define the multi-stage workfloe
job = WorkFloe("multi stage workfloe")


# Stage 1
stage1 = WorkFloe("stage1")
ifs = SourceCube()
ifs.modify_parameter(ifs.data_in, promoted_name="ifs")
ttl = TTLSetter()
ofs = SinkCube()
ofs.modify_parameter(ofs.data_out, promoted_name="ofs")
stage1.add_cubes(ifs, ttl, ofs)
ifs.success.connect(ttl.intake)
ttl.success.connect(ofs.intake)


# Stage 2
stage2 = WorkFloe("stage2")
ifs2 = SourceCube()
ofs2 = SinkCube()
ofs2.modify_parameter(ofs2.data_out, promoted_name="ofs2")
ifs3 = SourceCube()
ttl_printer = TTLPrinter()
stage2.add_cubes(ifs2, ofs2, ttl_printer, ifs3)
ifs2.success.connect(ofs2.intake)
ifs3.success.connect(ttl_printer.intake)


# Add stages
job.add_stage(stage1)
job.add_stage(stage2)


# Connect parameters across stages
ofs.connect_parameter("ofs", ifs2, "data_in")
ofs.connect_parameter("ofs", ifs3, "data_in")


if __name__ == "__main__":
    job.run()

Cyclic WorkFloes

Many problems have solutions which are cyclic by nature, such as optimization. Floe comes with support for running WorkFloes containing cycles, with some caveats. Cycles should be used judiciously, as they can easily lead to deadlock or livelock.

When using cycles in Floe, the following caveats apply:

  • Although a WorkFloe may be defined by an arbitrary number of cubes, the number of cubes in a single cycle is limited to the number of CPUs available for scheduling, as all cubes in a cycle must run concurrently.

  • The use of CubeGroups in cycles is not yet supported.

  • There should always be at least one tested data path out of the cycle.

  • Floe does not provide a mechanism for a cycle invariant. If a you construct an infinite loop, it will run forever. See the previous bullet.