Cubes
Cubes are the fundamental building blocks for WorkFloes. Cubes are defined by both their source code and their metadata. A cube should be a small piece of reusable logic, that makes few assumptions about what cubes are upstream or downstream.
Cube Types
There are five fundamental cube types in Floe:
Source cubes: Cubes that put data into a
WorkFloe
, which inherit fromSourceCube
.Compute cubes: Cubes that perform computation on a stream of data, which inherit from
ComputeCube
.Parallel compute cubes: A special case of compute cubes that perform computation on a stream in parallel, which inherit from
ParallelMixin
.Sink cubes: Cubes that take a stream of data from a
WorkFloe
and sends it to an external resource, which inherit fromSinkCube
.HyperCubes: Encapsulate a set of connected cubes and connect it within a
WorkFloe
with the same API as a cube.
Cube Methods
The behavior and logic of a cube is customized by implementing methods on the cube’s class. The following methods may be implemented by cube authors for any cube:
Cube.begin()
The begin()
method is called for cube initialization, before process()
is ever called. It is provided to allow
for customized cube initialization. This method is only called when a WorkFloe
is being run, and by the time begin()
is called, data may be emitted.
Caution
begin
should be used in place of overriding __init__
which can cause unexpected cube
failures.
Cube.process()
The process(self, data, port)
method is called for each input item received. Computation and decision logic should be defined inside
of this method.
Cube.end()
The end()
method is called after all input has been consumed, and is provided so that cubes may perform any necessary clean up, or emit any
accumulated data.
Additionally, special methods are provided for source and sink cubes:
__iter__
: A method on source cubes, which yields items to be sent down streamwrite(data)
: A method on sink cubes, to which items from theWorkFloe
are written
Caution
For Parallel cubes running in Orion, end()
is called after processing a batch of work.
A single batch may contain up to item_count
items of work.
Note
Items may be emitted to output ports from within any of the above methods.
Emitting Items
In order to send work items downstream to other cubes, they must be emitted. Items can be emitted to a port by calling emit
on the port itself. A common use case for multiple ports
is for controlling the flow of data to different downstream cubes.
Caution
A single call to emit
is limited to 4GB. Each call allocates memory on both the sender and receiver, so it is best to emit
smaller chunks (typically, 64MB) to avoid exhausting memory. Alternatively, consider storing large amounts of data in Orion and only emit
a reference to the data.
See also
Data handling in Orion especially shard collections.
A binary filter cube can be written in the following way:
from floe.api import ComputeCube, BinaryInputPort, BinaryOutputPort
class FilterCube(ComputeCube):
success = BinaryOutputPort()
failure = BinaryOutputPort()
def process(self, item, data):
if item_should_pass(item):
# Send passing items downstream on the success port
self.success.emit(item)
else:
# Send failed items downstream on the failure port
self.failure.emit(item)
In some scenarios, it is useful to emit an item to all output ports (a broadcast). To accomplish this, call the emit
method
defined on the cube class:
from floe.api import ComputeCube, BinaryInputPort, BinaryOutputPort
class BroadCastCube(ComputeCube):
first_output = BinaryOutputPort()
second_output = BinaryOutputPort()
def process(self, item, data):
do_something(item)
# Send item to all outputs
self.emit(item)
Cube Parameters
Runtime behavior of cubes can be controlled with built-in and custom Parameters.
All Cubes have a set of built-in parameters whose defaults are appropriate for general floes.
The buffer_size
parameter specifies the amount of data buffered by Floe before sending it
downstream. The parameters in the Hardware Requirements group do not
affect a locally running Floe, and are used to determine what type of machine the cube may use when run in Orion. Additional
Parallel Cube Parameters are described below.
Writing Cubes
Cubes are defined in Python using classes. You can define a new cube by subclassing one of the built-in base classes. Floe will handle all of the network communication, deserialization, and scheduling so that cube authors only need to be concerned with the internal logic of their cubes.
For any stream connecting two cubes, the following guarantees are provided:
Any item emitted to a connected port will be delivered exactly once to the corresponding downstream port.
All items for a given stream arrive in order (excluding parallel cubes).
Writing a compute cube
A compute cube performs computation on a stream of data. The data may arrive on any input port defined for the cube, and can be
emitted for downstream consumption to one or more output ports. To define a compute cube, a class is created that subclasses ComputeCube
and defines
a process
method. Floe provides the name of the input port, and the deserialized data to the process
function. The process
function will be
called for every piece of data from each input stream. From within the process
method, data may be emitted to any output port.
Example:
from floe.api import ComputeCube, BinaryOutputPort, BinaryInputPort
class MyComputeCube(ComputeCube):
title = "My Compute Cube"
intake = BinaryInputPort()
success = BinaryOutputPort()
def begin(self):
# Any cube setup may be done here
pass
def process(self, data, port):
# Emit the data on the success port to the next cube
self.success.emit(data)
def end(self):
# Any cube tear down may be done here
pass
Writing a source cube
A source cube can be created by subclassing SourceCube
, or subclassing a class that inherits from SourceCube
,
and overriding the __iter__
method. The __iter__
method must be a generator that yields items to be consumed downstream. All
items yielded as part of the iterator will be emitted on all output ports on the cube.
Example:
from floe.api import SourceCube, BinaryOutputPort
class MyCustomSource(SourceCube):
success = BinaryOutputPort()
def __iter__(self):
for data in my_custom_source():
yield data # equivalent to self.emit(data), which sends data to all output ports
You can define source cubes that emit any data type, such as text:
from floe.api import SourceCube
class MyCustomTextSource(SourceCube):
def __iter__(self):
for data in my_custom_text:
yield data
If further control is needed to specify which port to emit items on, it can be done within begin
:
from floe.api import SourceCube
from floe.api.ports import JsonOutputPort
class SpecificPortSource(SourceCube):
port_one = JsonOutputPort()
port_two = JsonOutputPort()
def begin(self):
i = 0
for data in my_custom_source():
if i % 2 == 0:
self.port_one.emit(data)
else:
self.port_two.emit(data)
i += 1
def __iter__(self):
"""Construct an empty iterator"""
return iter([])
Writing a sink cube
A sink cube is a cube which stores items from the WorkFloe
to an external resource. Examples include writing items to a file, storing them
in a database, or sending them to a service. To define a sink cube, a class is created that subclasses SinkCube
and defines a
write
method which takes as parameters the item to be written, and the port on which it arrived.
Here is a conceptual example of a cube which writes items to a relational database:
from floe.api import SinkCube
class DatabaseSink(SinkCube):
def begin():
self.conn = get_db_connection()
def write(self, data, port):
self.conn.insert("...example sql here...")
Stdout, Stderr, and Logging
All output is captured and stored when cubes run within Orion. When run locally, output is only captured when specified by command line arguments. Any output captured will be prepended with the cube’s name.
Cubes at runtime have access to a log
attribute on the class, which has debug
, info
, warning
, and error
methods.
Parallel Cubes
Floe is designed to enable large scale computation to be expressed succinctly and executed efficiently.
A compute cube manifests within a WorkFloe
as a single process that sequentially processes items of work
in a data stream. If a compute cube can process items of work without considering the results of all
previously processed data, that is, the computation is embarrassingly parallel, a compute cube can be modified
such that Floe will dynamically spawn duplicate cube processes in proportion to the amount of available data
in order to parallelize data processing and achieve greater throughput.
Parallel compute cubes can be written by subclassing from a parallel base class:
from floe.api import ParallelComputeCube
class MyParallelCube(ParallelComputeCube):
def process(data, port):
do_computation(data)
The semantics of a parallel compute cube are identical to those of a regular compute cube, with the following exceptions:
The order in which items are processed, and subsequently their transmission downstream, is undefined (as opposed to in order delivery).
The
end()
method will be called at least once (as opposed to exactly once).Because of how failed items of work are retried, emitting must wait until an item is fully processed.
Because of how failed items of work are retried, writing to an external medium can result in duplicated data. See Collections.
The last two bullets are related and have to do with the inevitability of partial and duplicate operations that can happen when a failure occurs. A single item of work may cause several results to be emitted out ports or data objects to be written, and the worker could fail in the middle of that process. So, in order to prevent partial and duplicate emissions, the parallel cube broker waits for an item of work to be fully processed before emitting resulting items to output ports. And that ties in to why shards must be closed downstream of a parallel cube.
Writing a Parallel Cube
Parallel cubes are special cases of compute cubes and behave similarly with some exceptions. Because the behavior is so similar, it is easy to write parallel cubes that have unexpected behavior or side effects.
Side effects and state in end()
:
The
end()
methods are still called for parallel cubes, but is called at least once. In fact, it is called many times. This means that any side effect of calling process or end needs to be idempotent.Additionally, parallel cubes cannot be used to hold global state, as the state would not actually be global (just local to some subset of input).
File I/O:
Parallel cubes are executed by worker processes distributed across many machines. Any files written by a parallel cube may or may not already have been written by another instance of that cube, and so race conditions may exist.
Making a Serial/Parallel Cube
Added in version 0.2.181.
It is often desirable to make parallel and serial cubes from a common base class. The example below shows how to do this using the ParallelMixin class.
from floe.api import ComputeCube, ParallelMixin
class CustomCube(ComputeCube):
title = "My Custom Cube"
def process(self, data, port):
do_computation(data)
class MyCustomParallelCube(ParallelMixin, CustomCube):
# Append to title
title = CustomCube.title + "(Parallel)"
Parallel Cube Parameters
Parallel cubes have a set of parameters that determine the how the parallelization of work occurs, the values of which can have a large impact on performance. This section describes those parameters, and how to choose reasonable default values for them when creating a parallel cube.
item_count
Cubes process items in a stream, one at a time, as discussed in the WorkFloes section. However, if items are sent over the network between
cubes one at a time, the cost of that I/O can be greater than the cost of the calculation within the cube. The overall efficiency of the WorkFloe
can be
improved by sending items in batches. The item_count
parameter specifies the maximum number of work items that should be sent at a time. As a rule of thumb,
set item_count
to a value such that each batch of work is processed in roughly 1 minute.
Note
A value for item_count
that is too large can cause a minority of CPUs to receive a majority of work items, leading to a slower overall
execution time.
Note
The value item_count
is a maximum. If workers request work faster than the internal work queues are refilled, messages will container fewer than item_count
items.
Example of overriding a parallel cube parameter.
from floe.api import ComputeCube, ParallelMixin
class MyParallelCube(ParallelMixin, ComputeCube):
title = "My Parallel Cube"
# Overrides the default value of the item_count parameter
parameter_overrides = {
"item_count": {"default": 10}
}
def process(self, data, port):
do_computation(data)
max_failures
Added in version 0.2.181.
In Orion, if a work item in a parallel cube fails to process correctly, the work will automatically be retried.
The number of times that the work will be retried, before calling the method process_failed
on the cube, can be set through
the max_failures
parameter. By default, process_failed
prints a log message, but otherwise skips the work.
Cube authors may add custom behavior in their own process_failed
method.
Caution
A work item that fails more than max_failures
times will be skipped!
To address this issue and prevent data loss, a cube author can:
Re-emit the work in a cycle or to a specialized cube for further processing.
Re-emit to a serial cube for recording failed work or failing the job by raising an exception.
The example below demonstrates overriding the process_failed
method to send work items to a specific port.
from floe.api import ComputeCube, ParallelMixin, BinaryOutputPort
class MyParallelCube(ParallelMixin, ComputeCube):
title = "My Parallel Cube"
failure_port = BinaryOutputPort()
# Overrides the default values of these parameters
parameter_overrides = {
"item_count": {"default": 10},
"max_failures": {"default": 2},
}
def process(self, data, port):
do_computation(data)
def process_failed(self, data, port, last_error):
self.log.error("Failed to process item: {}".format(last_error))
self.failure_port.emit(data)
Packaging Cubes
In order to use a cube, it must be importable so that Floe can import it to be used in WorkFloes. To be importable, a cube must be defined inside of a module. A module is simply a file that contains Python code.
Below is an example cube module that contains two cubes:
# my_cubes.py
import random
from floe.api import ComputeCube, BinaryInputPort, BinaryOutputPort
class PrintDataCube(ComputeCube):
""" Cube that prints the raw bytes """
title = "Print Data"
intake = BinaryInputPort()
def process(self, data, port):
print(data)
self.emit(data)
class JitterCube(ComputeCube):
""" A cube that pass/fails data randomly """
success = BinaryOutputPort()
failure = BinaryOutputPort()
title = "Jitter Cube"
def process(self, data, port):
if random.randint(0, 5) < 3:
self.success.emit(data)
else:
self.failure.emit(data)
Now these cubes can be imported and used in a WorkFloe
, as demonstrated by the example below:
# example_floe.py
from my_cubes import PrintDataCube
from floe.api import SinkCube, WorkFloe, SourceCube
job = WorkFloe("Workfloe Example")
ifs = SourceCube()
printer = PrintDataCube()
ofs = SinkCube()
job.add_cubes(ifs, printer, ofs)
ifs.success.connect(printer.intake)
printer.success.connect(ofs.intake)
if __name__ == "__main__":
job.run()
Cube Ports
Cubes have ports that are used for connecting cubes together. These ports are defined on the cube’s class:
from floe.api import ComputeCube, BinaryInputPort, BinaryOutputPort
class MyCube(ComputeCube):
# A success port
success = BinaryOutputPort(title="Successfully processed items")
# A failure port
failure = BinaryOutputPort(title="Items with errors")
# An alternate input port
alt_input = BinaryInputPort(title="A specific type of input")
Added in version 0.2.181: Support for port ‘titles’.
Custom Ports
Cubes may be concurrently run on separate computers that are connected by a network. The Floe Python API can be customized for serializing custom data types to a representation that is suitable for network transmission.
Correct serialization and deserialization is the responsibility of a Port
. The Floe runtime will enforce
that connections are only allowed between matching Port types, with the expectation that the deserialized object
returned from the Port will be usable by the cube.
Writing a custom port
To customize how an object is serialized for a particular port, two methods must be defined on the port’s class:
encode(data)
: Takes the input object and returns a byte representation of itdecode(data)
: Takes a sequence of bytes and reconstructs a valid object from it
Here is an example which uses Python’s pickle module:
import pickle
class PickleEncoder:
def encode(item):
return pickle.dumps(item)
def decode(data):
return pickle.loads(data)
To use this custom encoder, define port classes that inherit from it as well as from InputPort
and OutputPort
.
These newly defined ports will need to define PORT_TYPES
as a tuple of string values. To connect ports within a
WorkFloe
they must share one of the string values, otherwise the WorkFloe
will be invalid.
from floe.api import InputPort, OutputPort
class MyInputPort(PickleEncoder, InputPort):
# Define types of ports these can connect to
PORT_TYPES = ("pickle",)
class MyOutputPort(PickleEncoder, OutputPort):
# Define types of ports these can connect to
PORT_TYPES = ("pickle",)
Initializer Ports
It is often useful to be able to send initialization data from an upstream cube to one or more downstream cubes, including parallel cubes. This can be achieved by designating a port as an initializer as shown in the example below. Like typical ports, initialization port types must match. Unlike typical ports, initialization ports imply that an upstream cube is finished before a downstream cube begins processing any regular work items. Also, instead of passing work items as an argument to a cube method, initializer data may be consumed only once from any cube method as a Python generator.
from floe.api import ComputeCube, BinaryInputPort, BinaryOutputPort
class ExampleInitializer(ComputeCube):
title = "Example with initializer"
intake = BinaryInputPort()
output = BinaryOutputPort()
# This port is only for initializer data
setup = BinaryInputPort(initializer=True)
def begin(self):
self.count = 0
for data in self.setup:
print(data)
self.count += 1
def process(self, data, port):
self.emit(data)
def end(self):
print(self.count)
Caution
It is possible to deadlock when sending data to an initializer port downstream. When an upstream cube’s total output on a port exceeds 2GB, it will block, waiting for output to be consumed by a downstream cube. The downstream cube also blocks while its initializer port waits for the upstream cube to finish. In similar cases, to avoid deadlocks, the total output for an upstream cube on a port should be less than 2GB.
Note
Initializer ports cannot be used within a cycle, or a cube group.
Added in version 2018.2.2.
Testing Cubes
The Floe Python API comes with built-in utilities for testing cubes. The example below demonstrates the use of the Floe
Python test API, along with the unittest
package that is built into Python. The details of using unittest
and testing
Python in general are beyond the scope of this document, and more information may be found in the official documentation
and the Hitchhiker’s Guide to Python. For testing Floes, see Artemis.
Suppose we have the following cube to test:
# example_cubes.py
from floe.api import ComputeCube, BinaryOutputPort, BinaryInputPort
class CounterCube(ComputeCube):
"""
Counts the number of messages received
"""
intake = BinaryInputPort()
success = BinaryOutputPort()
def begin(self):
self.count = 0
def process(self, data, port):
self.count += 1
self.success.emit(data)
A convenience context manager is provided to help write concise tests.
Once inside the context provided by CubeTestRunner
, the cube will be ready for testing. The cube’s parameters will
be prepared, taking into account their default values and any optional parameters provided to the context manager.
Test data may be sent to the cube using the send_inputs
method on the context manager. The send_inputs
method
accepts keyword arguments where the keys are port names and the values are lists of input data.
The example below demonstrates how to use the CubeTestRunner
context manager to write unit tests.
import unittest
from floe.api import ComputeCube
from floe.test import CubeTestRunner
from floe.api.parameters import StringParameter
from floe.api.ports import BinaryInputPort, BinaryOutputPort
class ExampleCube(ComputeCube):
example_param = StringParameter()
intake = BinaryInputPort()
setup = BinaryInputPort(initializer=True)
success = BinaryOutputPort()
def begin(self):
# example: read initialization data
self.init_data = list(self.setup)
def process(self, data, port):
self.success.emit(data)
class CounterTester(unittest.TestCase):
def test_cube(self):
# test data for the intake port
test_data = [b"example bytes"]
# test data for the setup port
initializer_data = [b"example init data"]
# test values for parameters
parameters = {"example_param": "example value"}
# define the cube to be tested
cube = ExampleCube()
# creates the context manager, providing the cube and parameters to be tested
with CubeTestRunner(cube, parameters=parameters) as runner:
# send the test data to the cube
# process() is called as a side effect
runner.send_inputs(setup=initializer_data, intake=test_data)
# verify the value of the parameter
self.assertEqual(cube.args.example_param, "example value")
# check the number of times data is emitted to the success port
self.assertEqual(runner.outputs["success"].emit_count, 1)
# verify the data sent to the success port
self.assertEqual(list(runner.outputs["success"]), test_data)
if __name__ == "__main__":
unittest.main()
Tests may be written without using a context manager, but more steps are required.
Writing a test without a context manager consists of the following:
Create an instance of a
CubeTestRunner
and provide it with the cube to be tested.Call the
start()
method on the test runner. This sets up the cube and calls thebegin()
method on the cube.Call the
process()
method on the cube, providing the correct data type and the name of the input port.Call the
finalize()
method on the test runner. This calls theend()
method on the cube.
Note
The start()
method must be called for the test runner to work properly.
Any data emitted to output ports by the tested cube is made available as a Python list. This allows tests to check the size of each output stream, and read the items that were output.
Other methods that CubeTestRunner
provides:
set_initializer_input
to set an iterable as the input the cube should receive within the test. Must be called prior tostart()
.set_parameters
a utility method that callsprepare
on the values prior to setting the values on the cubes. Must be called prior tostart()
.
Debugging Cubes
Floe enables the Python faulthandler which dumps tracebacks for segmentation faults.
Floe defines signal handlers for USR1 and USR2 which:
prints a traceback and starts a function profile upon an initial USR1 signal
collects/stops function profile after second USR1 >5 second later
prints garbage collection stats and starts gc leak debugging upon a single USR2 signal
prints stats, collects, and stops garbage collection leak debugging after second USR2 >5 second later
Cube Metrics
Beginning in Orion version 2020.1, cube metrics are available. Metrics for a cube may be enabled on a per cube basis by setting the cube_metrics parameter. The parameter value should contain a list of desired metrics. The periodicity of metric collection may be specified through the metric_period parameter as an integer (seconds).
The following metrics are available:
Name |
Description |
---|---|
cpu |
Total CPU utilization, as a percentage, as computed between two subsequent samples. Data is read from the Linux CPU accounting controller. |
memory |
Total memory usage, in bytes, as reported by the Linux process information pseudo-filesystem. |
disk |
Total filesystem usage, in bytes, for the filesystem available to the cube. |
network |
Total network I/O, in bytes, for the network device available to the cube. |
gpu |
GPU utilization, as a percentage, as reported by the NVIDIA driver. |
Tuning Cube Communication
Running Cubes in Orion provides access to far more compute resources than typically available to an individual, and to enable this there are certain assumptions made about how to handle data passed between cubes. It is important to understand the parameters that provide the developer with the ability to tune these assumptions to best suit their scenario.
It is important to note that every cube uses buffers for sending and receiving data. A single downstream cube which is slow to process items can cause buffers on upstream cubes to fill up. Once those buffers are full, cubes will be blocked until items are consumed from them.
Serial Cubes
Serial cubes only have one system parameter that impacts their ability to
stream results, which is the buffer_size
parameter. The value of buffer_size
specifies
the size of the buffer, in bytes, that Floe keeps for messages that are processed and are to be sent
to a downstream cube. If items can be processed faster than they can be sent to downstream cubes
then the cube has to wait until the buffer has space to process more messages.
The default buffer_size
value is a reasonable starting point for writing Cubes and it is not
recommended to reduce it. Increasing the buffer_size
value can increase the amount of work that can be done
before waiting for a downstream cube to retrieve the data as well as increase the amount that
can be transferred to the downstream cube. It is worth noting that this will consume more memory,
so when running in Orion there may be a need to increase the cube’s memory_mb
requirements as well. Often
picking the value of buffer_size
comes down to the specific use case, and may require some trial and error.
Note
Often the most performance gain is to be seen by serial cubes that handle large numbers of small messages. However, often it is advised to switch to a batch method like Files or ShardCollections in these scenarios.
Parallel Cubes
Parallel cubes have a system parameter that affects how many instances of the cube can
run simultaneously and the effectiveness of each instance: item_count
. The parameter value is
the maximum of messages that are bundled up together and provided to each instance.
Note
Prior to the 2019.5 release the parameter prefetch_count
was also used to control scaling.
prefetch_count
has been deprecated and is no longer honored.
When programs (such as cubes) exchange data over a network, there is a performance cost for each transfer.
The total cost of all such transfers can dominate the cost of a calculation if item_count
is
chosen incorrectly. This is summarized with the following examples.
Suppose that a parallel cube performs a fast computation (ie Molecular weight) where each items is less than 1ms.
If the overhead of sending a message (a set of items) is assumed to be 1ms, the overhead of sending
messages can quickly become noticeable. Given the default values of item_count=1
and 1000 messages, each instance of the cube will receive 1 message to perform work on. This will result in
there being 1ms spent moving the message, for an instance to spend ~1ms on the work. In this scenario up to
100% of the time can be consumed by passing messages. By increasing the item_count
to a value that reduces the
impact of the message passing overhead, the floe will become more responsive. For example, setting item_count=1000
would require sending only a single message, only taking 1001ms rather than potentially 2000ms. As the number of messages
increase the overhead of passing messages will only become more noticeable and it is important to set values that
provide enough work to each instance of a cube.
The second example is of a parallel cube performing a long and computationally expensive
task (i.e. Molecular Dynamics) that takes an hour per item. Given the default values and 1000 messages,
each instance of the cube will receive 10 messages to perform work on. Assuming
that there are enough resources this would result in the max number of parallel cubes to be 100. This scenario
is not ideal as each item delivered will require an hour and will run sequentially on a single worker,
resulting in the minimal amount of time to perform the work to be 10 hours (each instance working for 10 hours). If the item_count
is changed to a value of 1
then the max number of parallel cubes raises to 1000, with each instance
only having to operate for an hour. This reduces the minimal time to an hour. This assumes perfect scaling, but
it can easily be seen that less elapsed time can potentially be spent.
Exceptions
Floe contains a module for exceptions which trigger special behaviors in Orion. For now, there is only one, namely,
floe.api.exceptions.HardwareError
. When a cube running in Orion raises this error, it will cause the
instance it is running on to stop(after finishing all existing work) and terminate.