Floe allows people to write reusable flow-based programs, called WorkFloes. A
WorkFloe is defined by a directed graph where each graph vertex is called a cube. Each
cube may have one or more ports which are used for connecting cubes together. When a
WorkFloe is executed, cubes are executed concurrently, and data is streamed between the cubes.
Technical details such as cube execution order, cube launching, networking, and buffering are handled automatically. The following documentation demonstrates how to get started
writing WorkFloes in Python.
WorkFloes consist of connected cubes, where cubes are typed based on their role in the WorkFloe. Source cubes put data into the WorkFloe, sink cubes store data externally, and compute cubes operate on discrete data on a stream within the WorkFloe. The individual pieces of data on a stream are called work items. Work items may be anything that can be serialized to bytes. See serialization.
WorkFloe is defined by a graph, rather than code, a
WorkFloe can be expressed using only its graph. The graph
itself can be encoded into structured JSON for portability. WorkFloes can also be written using the Python Floe API, which is discussed in
WorkFloe in Python¶
To begin, we create a new
from floe.api import WorkFloe my_floe = WorkFloe("my_floe")
WorkFloe constructor takes one required argument, the name of the
WorkFloe. The name should consist of
letters, numbers, and the character “_” (in other words, the name should be a valid Python variable identifier).
You may also provide a human-readable title and description:
my_floe = WorkFloe("my_floe", title="An example WorkFloe", description="...a multiline description may be used here")
Adding cubes to a WorkFloe¶
WorkFloe requires cubes to actually do something. Cubes can be added to the
WorkFloe as shown in this example:
from floe.api import SourceCube, SinkCube ifs = SourceCube() ofs = SinkCube() my_floe.add_cubes(ifs, ofs)
Connecting cubes together¶
Connections are made between cubes using the
connect() method of each cube’s port. Connections must always be made from
the source cube to the destination cube.
Building on the previous example, at this point,
my_floe consists of two cubes;
ofs. However, there are no connections yet.
Connections can be made using the source and destination ports:
Cubes must always be added to an instance of a
WorkFloe before connections can be made.
my_floe defines a valid WorkFloe.
Running a WorkFloe¶
The Floe Python API allows WorkFloes to be executed within the Orion platform, and on developer’s own machines. A WorkFloe
can be executed from Python code, or JSON. For development, it is convenient to run a
WorkFloe from Python as shown in the following example.
# example_floe.py from my_cubes import OE2DPSACube from floe.api import WorkFloe, SourceCube, SinkCube job = WorkFloe("Calcluate 2D PSA Floe") ifs = SourceCube() psa = OE2DPSACube(title="PSA Calculator") ofs = SinkCube() job.add_cubes(ifs, psa, ofs) ifs.success.connect(psa.intake) psa.success.connect(ofs.intake) if __name__ == "__main__": job.run()
example.py can be run as a Python script:
$ python example_floe.py
You must always include the
if __name__ == "__main__" statement as a condition to call
run(), or Floe will not be able to import your Floe
without running it.
Serializing a WorkFloe¶
WorkFloe can be serialized to JSON using the Floe Python API.
An example WorkFloe:
from floe.api import WorkFloe, SourceCube, SinkCube my_floe = WorkFloe("my_floe") ifs = SourceCube() ofs = SinkCube() my_floe.add_cubes(ifs, ofs)
Serializing to JSON¶
The example above can be converted into a Python dictionary suitable for serialization:
serialized = my_floe.json()
WorkFloe can be reconstructed from JSON back into a
my_floe = WorkFloe.from_json(serialized)
All files containing cubes must be in your PYTHONPATH, otherwise they cannot be imported.
Cube Execution Order¶
The order of execution of cubes in a
WorkFloe is determined automatically by examining the graph that defines it. Using the Floe
Python API to run WorkFloes (outside of Orion) will, by default, use all available CPUs. One CPU is reserved for coordination, as well as one additional CPU
per parallel cube for work tracking, and the remaining CPUs are used to run cubes.
An implementation of a FBP (Flow Based Programming) system can be designed to operate on streams of data in two ways. The first way is process centric, where communicating sequential processes operate on discrete items as they arrive on a stream and forward the mutated item downstream. The second way is data centric, where a sequence of computations are performed by a single process on each item in the stream, one at a time. For example, suppose two computations are to be performed, A followed by B. In a process centric implementation, A and B would be executed concurrently by two separate processes. A would perform its computation on each item in the stream and then transmit the item to B (possibly over a network connection). Note that this type of system requires that items be serialized to a byte representation by A, then deserialized by B. In a data centric implementation, a single process would execute A, then execute B on each item in the stream, without the need for serialization. In summary, a process centric implementation has the benefit of concurrency, but a data centric implementation doesn’t incur the cost of serialization. Floe allows for both modes of execution at the same time.
By default, each cube is executed in its own process, and items are transmitted between cubes using a network connection.
However, the time required for serialization between cubes can often be longer than the cubes themselves. In such cases,
efficiency can be increased by grouping sequential cubes together to be executed by a single process, without serialization,
in a data centric way. Groups can be used in a
WorkFloe just like any other cube, and can be connected to other cubes or groups.
Cubes can be added to a
CubeGroup by passing a list of cubes into the constructor, and then adding the group
to the WorkFloe:
# example_group_floe.py from my_cubes import PrinterCube from floe.api import WorkFloe, SourceCube, SinkCube, CubeGroup job = WorkFloe("Cube Group Example") ifs = SourceCube() printer = PrinterCube() ofs = SinkCube() # Add cubes to the WorkFloe job.add_cubes(ifs, printer, ofs) # Create a cube group group = CubeGroup(cubes=[ifs, printer, ofs]) # Add the group to the WorkFloe job.add_group(group) ifs.success.connect(printer.intake) printer.success.connect(ofs.intake) if __name__ == "__main__": job.run()
Parallel Cube Groups¶
Parallel cubes can be grouped together just like regular cubes. A parallel cube group behaves just like a parallel cube, but can run multiple parallel cubes at once.
Parallel cubes may be added to parallel cube groups by passing the list of cubes to an constructor and adding the group to the WorkFloe:
# example_parallel_group_floe.py from my_cubes import ParallelPassThrough, ParallelPrinterCube from floe.api import WorkFloe, SourceCube, SinkCube, ParallelCubeGroup job = WorkFloe("parallel_group_example") ifs = SourceCube() passthrough = ParallelPassThrough() printer = ParallelPrinterCube() ofs = SinkCube() # Create a cube group group = ParallelCubeGroup(cubes=[passthrough, printer]) # Add the group to the WorkFloe job.add_group(group) # Add cubes to the WorkFloe job.add_cubes(ifs, passthrough, printer, ofs) ifs.success.connect(passthrough.intake) passthrough.success.connect(printer.intake) printer.success.connect(ofs.intake) if __name__ == "__main__": job.run()
It is often useful to encapsulate a set of connected cubes (a partial or complete WorkFloe) as a cube itself. This type of cube is known as a HyperCube.
HyperCubes are a wrapper around a partial
WorkFloe that can be treated as a cube within another WorkFloe. HyperCubes are useful to create a collection of cubes
that are repeatedly used in a particular configuration. Note that HyperCubes are distinct from CubeGroups. Whereas CubeGroups execute a set of cubes in a single process,
a HyperCube does not. A
WorkFloe consisting of HyperCubes will execute identically to a
WorkFloe consisting of all of the cubes within those HyperCubes (assuming they are connected identically).
The following example demonstrates how to define a partial
WorkFloe and promote ports to act as the HyperCubes ports.
from floe.api import WorkFloe, HyperCube from floe.tests.test_cubes import PrintPassThroughCube # Create WorkFloe job = WorkFloe(name="Example HyperCube") print_cube_1 = PrintPassThroughCube() print_cube_2 = PrintPassThroughCube() print_cube_3 = PrintPassThroughCube() job.add_cubes(print_cube_1, print_cube_2, print_cube_3) # Connect Cubes within WorkFloe print_cube_1.success.connect(print_cube_2.intake) print_cube_2.success.connect(print_cube_3.intake) # Wrap ``WorkFloe`` in a HyperCube HyperPrintCube = HyperCube(job) # Promote Ports to act as the inputs and outputs of the HyperCube HyperPrintCube.promote_port(print_cube_1, "intake") # Able to add a promoted name which will be used in place of the original name HyperPrintCube.promote_port(print_cube_3, "success", promoted_name="pass_success")
The following example demonstrates how to use a HyperCube in a WorkFloe
from floe.api import WorkFloe, SourceCube, SinkCube # Import the variable that is defined as the Workfloe wrapped in a HyperCube from example.hyper_pass import HyperPrintCube # Create WorkFloe job = WorkFloe(name="Test HyperCube") ifs = SourceCube() pass_through_cube = HyperPrintCube() ofs = SinkCube() job.add_cubes(ifs, pass_through_cube, ofs) ifs.success.connect(pass_through_cube.intake) # Use the promoted name of the success port pass_through_cube.pass_success.connect(ofs.intake) if __name__ == "__main__": job.run()
HyperCubes are evaluated at run time, so you must declare a variable to represent your HyperCube.
In the above example this is done with
HyperPrintCube and should be imported as such
HyperCubes can be treated as almost identical to cubes. You define them with a
name and an optional keyword
title. Once instantiated, HyperCubes are added to a
WorkFloe in the same way that Cubes are.
from floe.api import WorkFloe, SourceCube, SinkCube from floe.tests.hyper_cubes.hyper_print_cube import HyperPrintCube job = WorkFloe("Simple HyperCube Floe") ifs = SourceCube() ifs.modify_parameter(ifs.data_in, promoted_name="ifs") # HyperCubes take similar parameters, notably a name and the optional title keyword argument hyper_print = HyperPrintCube() ofs = SinkCube() ofs.modify_parameter(ofs.data_out, promoted_name="ofs") job.add_cubes(ifs, hyper_print, ofs) ifs.success.connect(hyper_print.intake) hyper_print.success.connect(ofs.intake) if __name__ == "__main__": job.run()
Support for using HyperCubes in the Orion UI is a work in progress, and not yet supported.
The execution of a
WorkFloe can be separated into stages, where each stage runs to completion before the next begins. This is useful in cases
where data needs to be carried forward as a single unit instead of being streamed. Propagation of data is accomplished by connecting output
parameters of one stage to input parameters of a subsequent stage. A stage may be any valid
WorkFloe that does not have multiple stages itself.
Support for running multi-stage WorkFloes in Orion is a work in progress, and not yet supported.
The following example demonstrates how to define and connect stages in a WorkFloe.
import os import sys import tempfile from floe.api import WorkFloe, SourceCube, SinkCube from floe.tests.test_cubes import TTLSetter, TTLPrinter # Define the multi-stage workfloe job = WorkFloe("multi stage workfloe") # Stage 1 stage1 = WorkFloe("stage1") ifs = SourceCube() ifs.modify_parameter(ifs.data_in, promoted_name="ifs") ttl = TTLSetter() ofs = SinkCube() ofs.modify_parameter(ofs.data_out, promoted_name="ofs") stage1.add_cubes(ifs, ttl, ofs) ifs.success.connect(ttl.intake) ttl.success.connect(ofs.intake) # Stage 2 stage2 = WorkFloe("stage2") ifs2 = SourceCube() ofs2 = SinkCube() ofs2.modify_parameter(ofs2.data_out, promoted_name="ofs2") ifs3 = SourceCube() ttl_printer = TTLPrinter() stage2.add_cubes(ifs2, ofs2, ttl_printer, ifs3) ifs2.success.connect(ofs2.intake) ifs3.success.connect(ttl_printer.intake) # Add stages job.add_stage(stage1) job.add_stage(stage2) # Connect parameters across stages ofs.connect_parameter("ofs", ifs2, "data_in") ofs.connect_parameter("ofs", ifs3, "data_in") if __name__ == "__main__": job.run()
Many problems have solutions which are cyclic by nature, such as optimization. Floe comes with support for running WorkFloes containing cycles, with some caveats. Cycles should be used judiciously, as they can easily lead to deadlock or livelock.
When using cycles in Floe, the following caveats apply:
WorkFloemay be defined by an arbitrary number of cubes, the number of cubes in a single cycle is limited to the number of CPUs available for scheduling, as all cubes in a cycle must run concurrently.
The use of CubeGroups in cycles is not yet supported.
There should always be at least one tested data path out of the cycle.
Floe does not provide a mechanism for a cycle invariant. If a you construct an infinite loop, it will run forever. See the previous bullet.