Cubes

Cubes are the fundamental building blocks for WorkFloes. Cubes are defined by both their source code and their metadata. A cube should be a small piece of reusable logic, that makes few assumptions about what cubes are upstream or downstream.

Cube Types

There are five fundamental cube types in Floe:

  • Source cubes: Cubes that put data into a WorkFloe, which inherit from SourceCube.

  • Compute cubes: Cubes that perform computation on a stream of data, which inherit from ComputeCube.

  • Parallel compute cubes: A special case of compute cubes that perform computation on a stream in parallel, which inherit from ParallelMixin.

  • Sink cubes: Cubes that take a stream of data from a WorkFloe and sends it to an external resource, which inherit from SinkCube.

  • HyperCubes: Encapsulate a set of connected cubes and connect it within a WorkFloe with the same API as a cube.

Cube Methods

The behavior and logic of a cube is customized by implementing methods on the cube’s class. The following methods may be implemented by cube authors for any cube:

Cube.begin()

The begin() method is called for cube initialization, before process() is ever called. It is provided to allow for customized cube initialization. This method is only called when a WorkFloe is being run, and by the time begin() is called, data may be emitted.

Warning

begin should be used in place of overriding __init__ which can cause unexpected cube failures.

Cube.process()

The process(self, data, port) method is called for each input item received. Computation and decision logic should be defined inside of this method.

Cube.end()

The end() method is called after all input has been consumed, and is provided so that cubes may perform any necessary clean up, or emit any accumulated data.

Additionally, special methods are provided for source and sink cubes:

  • __iter__: A method on source cubes, which yields items to be sent down stream

  • write(data): A method on sink cubes, to which items from the WorkFloe are written

Warning

For Parallel cubes running in Orion, end() is called after processing a batch of work. A single batch may contain up to item_count items of work.

Note

Items may be emitted to output ports from within any of the above methods.

Emitting Items

In order to send work items downstream to other cubes, they must be emitted. Items can be emitted to a port by calling emit on the port itself. A common use case for multiple ports is for controlling the flow of data to different downstream cubes.

Warning

A single call to emit is limited to 4GB. Each call allocates memory on both the sender and receiver, so it is best to emit smaller chunks (typically, 64MB) to avoid exhausting memory. Alternatively, consider storing large amounts of data in Orion and only emit a reference to the data.

A binary filter cube can be written in the following way:

from floe.api import ComputeCube, BinaryInputPort, BinaryOutputPort

class FilterCube(ComputeCube):

    success = BinaryOutputPort()
    failure = BinaryOutputPort()

    def process(self, item, data):
        if item_should_pass(item):
            # Send passing items downstream on the success port
            self.success.emit(item)
        else:
            # Send failed items downstream on the failure port
            self.failure.emit(item)

In some scenarios, it is useful to emit an item to all output ports (a broadcast). To accomplish this, call the emit method defined on the cube class:

from floe.api import ComputeCube, BinaryInputPort, BinaryOutputPort

class BroadCastCube(ComputeCube):

    first_output = BinaryOutputPort()
    second_output = BinaryOutputPort()

    def process(self, item, data):
        do_something(item)
        # Send item to all outputs
        self.emit(item)

Cube Parameters

Runtime behavior of cubes can be controlled with built-in and custom Parameters.

All Cubes have a set of built-in parameters whose defaults are appropriate for general floes. The buffer_size parameter specifies the amount of data buffered by Floe before sending it downstream. The parameters in the Hardware Requirements group do not affect a locally running Floe, and are used to determine what type of machine the cube may use when run in Orion. Additional Parallel Cube Parameters are described below.

Writing Cubes

Cubes are defined in Python using classes. You can define a new cube by subclassing one of the built-in base classes. Floe will handle all of the network communication, deserialization, and scheduling so that cube authors only need to be concerned with the internal logic of their cubes.

For any stream connecting two cubes, the following guarantees are provided:

  • Any item emitted to a connected port will be delivered exactly once to the corresponding downstream port.

  • All items for a given stream arrive in order (excluding parallel cubes).

Writing a compute cube

A compute cube performs computation on a stream of data. The data may arrive on any input port defined for the cube, and can be emitted for downstream consumption to one or more output ports. To define a compute cube, a class is created that subclasses ComputeCube and defines a process method. Floe provides the name of the input port, and the deserialized data to the process function. The process function will be called for every piece of data from each input stream. From within the process method, data may be emitted to any output port.

Example:

from floe.api import ComputeCube, BinaryOutputPort, BinaryInputPort

class MyComputeCube(ComputeCube):

    title = "My Compute Cube"

    intake = BinaryInputPort()
    success = BinaryOutputPort()

    def begin(self):
        # Any cube setup may be done here
        pass

    def process(self, data, port):
        # Emit the data on the success port to the next cube
        self.success.emit(data)

    def end(self):
        # Any cube tear down may be done here
        pass

Writing a source cube

A source cube can be created by subclassing SourceCube, or subclassing a class that inherits from SourceCube, and overriding the __iter__ method. The __iter__ method must be a generator that yields items to be consumed downstream. All items yielded as part of the iterator will be emitted on all output ports on the cube.

Example:

from floe.api import SourceCube, BinaryOutputPort

class MyCustomSource(SourceCube):

    success = BinaryOutputPort()

    def __iter__(self):
        for data in my_custom_source():
            yield data  # equivalent to self.emit(data), which sends data to all output ports

You can define source cubes that emit any data type, such as text:

from floe.api import SourceCube

class MyCustomTextSource(SourceCube):
    def __iter__(self):
        for data in my_custom_text:
            yield data

If further control is needed to specify which port to emit items on, it can be done within begin:

from floe.api import SourceCube
from floe.api.ports import JsonOutputPort

class SpecificPortSource(SourceCube):

    port_one = JsonOutputPort()
    port_two = JsonOutputPort()

    def begin(self):
        i = 0
        for data in my_custom_source():
            if i % 2 == 0:
                self.port_one.emit(data)
            else:
                self.port_two.emit(data)
            i += 1

    def __iter__(self):
        """Construct an empty iterator"""
        return iter([])

Writing a sink cube

A sink cube is a cube which stores items from the WorkFloe to an external resource. Examples include writing items to a file, storing them in a database, or sending them to a service. To define a sink cube, a class is created that subclasses SinkCube and defines a write method which takes as parameters the item to be written, and the port on which it arrived.

Here is a conceptual example of a cube which writes items to a relational database:

from floe.api import SinkCube

class DatabaseSink(SinkCube):

    def begin():
        self.conn = get_db_connection()

    def write(self, data, port):
        self.conn.insert("...example sql here...")

Stdout, Stderr, and Logging

All output is captured and stored when cubes run within Orion. When run locally, output is only captured when specified by command line arguments. Any output captured will be prepended with the cube’s name.

Cubes at runtime have access to a log attribute on the class, which has debug, info, warning, and error methods.

Parallel Cubes

Floe is designed to enable large scale computation to be expressed succinctly and executed efficiently. A compute cube manifests within a WorkFloe as a single process that sequentially processes items of work in a data stream. If a compute cube can process items of work without considering the results of all previously processed data, that is, the computation is embarrassingly parallel, a compute cube can be modified such that Floe will dynamically spawn duplicate cube processes in proportion to the amount of available data in order to parallelize data processing and achieve greater throughput.

Parallel compute cubes can be written by subclassing from a parallel base class:

from floe.api import ParallelComputeCube

class MyParallelCube(ParallelComputeCube):

    def process(data, port):
        do_computation(data)

The semantics of a parallel compute cube are identical to those of a regular compute cube, with the following exceptions:

  • The order in which items are processed, and subsequently their transmission downstream, is undefined (as opposed to in order delivery).

  • The end() method will be called at least once (as opposed to exactly once).

  • Because of how failed items of work are retried, emitting must wait until an item is fully processed.

  • Because of how failed items of work are retried, writing to an external medium can result in duplicated data. See Collections.

The last two bullets are related and have to do with the inevitability of partial and duplicate operations that can happen when a failure occurs. A single item of work may cause several results to be emitted out ports or data objects to be written, and the worker could fail in the middle of that process. So, in order to prevent partial and duplicate emissions, the parallel cube broker waits for an item of work to be fully processed before emitting resulting items to output ports. And that ties in to why shards must be closed downstream of a parallel cube.

Writing a Parallel Cube

Parallel cubes are special cases of compute cubes and behave similarly with some exceptions. Because the behavior is so similar, it is easy to write parallel cubes that have unexpected behavior or side effects.

Side effects and state in end():

  • The end() methods are still called for parallel cubes, but is called at least once. In fact, it is called many times. This means that any side effect of calling process or end needs to be idempotent.

  • Additionally, parallel cubes cannot be used to hold global state, as the state would not actually be global (just local to some subset of input).

File I/O:

  • Parallel cubes are executed by worker processes distributed across many machines. Any files written by a parallel cube may or may not already have been written by another instance of that cube, and so race conditions may exist.

Making a Serial/Parallel Cube

New in version 0.2.181.

It is often desirable to make parallel and serial cubes from a common base class. The example below shows how to do this using the ParallelMixin class.

from floe.api import ComputeCube, ParallelMixin


class CustomCube(ComputeCube):
    title = "My Custom Cube"

    def process(self, data, port):
        do_computation(data)


class MyCustomParallelCube(ParallelMixin, CustomCube):
     # Append to title
     title = CustomCube.title + "(Parallel)"

Parallel Cube Parameters

Parallel cubes have a set of parameters that determine the how the parallelization of work occurs, the values of which can have a large impact on performance. This section describes those parameters, and how to choose reasonable default values for them when creating a parallel cube.

item_count

Cubes process items in a stream, one at a time, as discussed in the WorkFloes section. However, if items are sent over the network between cubes one at a time, the cost of that I/O can be greater than the cost of the calculation within the cube. The overall efficiency of the WorkFloe can be improved by sending items in batches. The item_count parameter specifies the maximum number of work items that should be sent at a time. As a rule of thumb, set item_count to a value such that each batch of work is processed in roughly 1 minute.

Note

A value for item_count that is too large can cause a minority of CPUs to receive a majority of work items, leading to a slower overall execution time.

Note

The value item_count is a maximum. If workers request work faster than the internal work queues are refilled, messages will container fewer than item_count items.

Example of overriding a parallel cube parameter.

from floe.api import ComputeCube, ParallelMixin


class MyParallelCube(ParallelMixin, ComputeCube):
    title = "My Parallel Cube"

    # Overrides the default value of the item_count parameter
    parameter_overrides = {
        "item_count": {"default": 10}
    }
    def process(self, data, port):
        do_computation(data)

max_failures

New in version 0.2.181.

In Orion, if a work item in a parallel cube fails to process correctly, the work will automatically be retried. The number of times that the work will be retried before calling the method process_failed on the cube, can be set through the max_failures parameter. By default, process_failed prints a log message, but otherwise skips the work. Cube authors may add custom behavior in their own process_failed method.

Warning

A work item that fails more than max_failures times will be skipped!

To address this issue and prevent data loss, a cube author can:

  • Re-emit the work in a cycle or to a specialized cube for further processing.

  • Re-emit to a serial cube for recording failed work or failing the job by raising an exception.

Warning

Setting max_failures below the default value may cause uncompleted work to be skipped!

To address this issue and prevent data loss, a cube author can take one of these actions:

  • Set tracking for the work items that were not completed.

  • Fail the floe outright with an exception.

  • Reroute failed work items through process_failed, thus making it an input stream to another cube.

The example below demonstrates overriding the process_failed method to send work items to a specific port.

from floe.api import ComputeCube, ParallelMixin, BinaryOutputPort


class MyParallelCube(ParallelMixin, ComputeCube):
    title = "My Parallel Cube"

    failure_port = BinaryOutputPort()

    # Overrides the default values of these parameters
    parameter_overrides = {
        "item_count": {"default": 10},
        "max_failures": {"default": 2},
    }
    def process(self, data, port):
        do_computation(data)

    def process_failed(self, data, port, last_error):
        self.log.error("Failed to process item: {}".format(last_error))
        self.failure_port.emit(data)

Packaging Cubes

In order to use a cube, it must be importable so that Floe can import it to be used in WorkFloes. To be importable, a cube must be defined inside of a module. A module is simply a file that contains Python code.

Below is an example cube module that contains two cubes:

# my_cubes.py
import random

from floe.api import ComputeCube, BinaryInputPort, BinaryOutputPort

class PrintDataCube(ComputeCube):
    """ Cube that prints the raw bytes """
    title = "Print Data"

    intake = BinaryInputPort()

    def process(self, data, port):
        print(data)
        self.emit(data)

class JitterCube(ComputeCube):
    """ A cube that pass/fails data randomly """
    success = BinaryOutputPort()
    failure = BinaryOutputPort()
    title = "Jitter Cube"

    def process(self, data, port):
        if random.randint(0, 5) < 3:
            self.success.emit(data)
        else:
            self.failure.emit(data)

Now these cubes can be imported and used in a WorkFloe, as demonstrated by the example below:

# example_floe.py
from my_cubes import PrintDataCube

from floe.api import SinkCube, WorkFloe, SourceCube

job = WorkFloe("Workfloe Example")
ifs = SourceCube()
printer = PrintDataCube()
ofs = SinkCube()

job.add_cubes(ifs, printer, ofs)

ifs.success.connect(printer.intake)
printer.success.connect(ofs.intake)

if __name__ == "__main__":
    job.run()

Cube Ports

Cubes have ports that are used for connecting cubes together. These ports are defined on the cube’s class:

from floe.api import ComputeCube, BinaryInputPort, BinaryOutputPort

class MyCube(ComputeCube):
    # A success port
    success = BinaryOutputPort(title="Successfully processed items")
    # A failure port
    failure = BinaryOutputPort(title="Items with errors")

    # An alternate input port
    alt_input = BinaryInputPort(title="A specific type of input")

New in version 0.2.181: Support for port ‘titles’.

Custom Ports

Cubes may be concurrently run on separate computers that are connected by a network. The Floe Python API can be customized for serializing custom data types to a representation that is suitable for network transmission.

Correct serialization and deserialization is the responsibility of a Port. The Floe runtime will enforce that connections are only allowed between matching Port types, with the expectation that the deserialized object returned from the Port will be usable by the cube.

Writing a custom port

To customize how an object is serialized for a particular port, two methods must be defined on the port’s class:

  • encode(data): Takes the input object and returns a byte representation of it

  • decode(data): Takes a sequence of bytes and reconstructs a valid object from it

Here is an example which uses Python’s pickle module:

import pickle

class PickleEncoder:
    def encode(item):
        return pickle.dumps(item)

    def decode(data):
        return pickle.loads(data)

To use this custom encoder, define port classes that inherit from it as well as from InputPort and OutputPort. These newly defined ports will need to define PORT_TYPES as a tuple of string values. To connect ports within a WorkFloe they must share one of the string values, otherwise the WorkFloe will be invalid.

from floe.api import InputPort, OutputPort

class MyInputPort(PickleEncoder, InputPort):
    # Define types of ports these can connect to
    PORT_TYPES = ("pickle",)

class MyOutputPort(PickleEncoder, OutputPort):
    # Define types of ports these can connect to
    PORT_TYPES = ("pickle",)

Initializer Ports

It is often useful to be able to send initialization data from an upstream cube to one or more downstream cubes, including parallel cubes. This can be achieved by designating a port as an initializer as shown in the example below. Like typical ports, initialization port types must match. Unlike typical ports, initialization ports imply that an upstream cube is finished before a downstream cube begins processing any regular work items. Also, instead of passing work items as an argument to a cube method, initializer data may be consumed only once from any cube method as a Python generator.

from floe.api import ComputeCube, BinaryInputPort, BinaryOutputPort

class ExampleInitializer(ComputeCube):

    title = "Example with initializer"
    intake = BinaryInputPort()
    output = BinaryOutputPort()

    # This port is only for initializer data
    setup = BinaryInputPort(initializer=True)

    def begin(self):
        self.count = 0
        for data in self.setup:
            print(data)
            self.count += 1

    def process(self, data, port):
        self.emit(data)

    def end(self):
        print(self.count)

Warning

It is possible to deadlock when sending data to an initializer port downstream. When an upstream cube’s total output on a port exceeds 2GB, it will block, waiting for output to be consumed by a downstream cube. The downstream cube also blocks while its initializer port waits for the upstream cube to finish. In similar cases, to avoid deadlocks, the total output for an upstream cube on a port should be less than 2GB.

Note

Initializer ports cannot be used within a cycle, or a cube group.

New in version 2018.2.2.

Testing Cubes

The Floe Python API comes with built-in utilities for testing cubes. The example below demonstrates the use of the Floe Python test API, along with the unittest package that is built into Python. The details of using unittest and testing Python in general are beyond the scope of this document, and more information may be found in the official documentation and the Hitchhiker’s Guide to Python. For testing Floes, see Artemis.

Suppose we have the following cube to test:

# example_cubes.py

from floe.api import ComputeCube, BinaryOutputPort, BinaryInputPort


class CounterCube(ComputeCube):
    """
    Counts the number of messages received
    """

    intake = BinaryInputPort()
    success = BinaryOutputPort()

    def begin(self):
        self.count = 0

    def process(self, data, port):
        self.count += 1
        self.success.emit(data)

A convenience context manager is provided to help write concise tests.

Once inside the context provided by CubeTestRunner, the cube will be ready for testing. The cube’s parameters will be prepared, taking into account their default values and any optional parameters provided to the context manager.

Test data may be sent to the cube using the send_inputs method on the context manager. The send_inputs method accepts keyword arguments where the keys are port names and the values are lists of input data.

The example below demonstrates how to use the CubeTestRunner context manager to write unit tests.

import unittest

from floe.api import ComputeCube
from floe.test import CubeTestRunner
from floe.api.parameters import StringParameter
from floe.api.ports import BinaryInputPort, BinaryOutputPort


class ExampleCube(ComputeCube):

    example_param = StringParameter()

    intake = BinaryInputPort()
    setup = BinaryInputPort(initializer=True)
    success = BinaryOutputPort()

    def begin(self):
        # example: read initialization data
        self.init_data = list(self.setup)

    def process(self, data, port):
        self.success.emit(data)


class CounterTester(unittest.TestCase):
    def test_cube(self):

        # test data for the intake port
        test_data = [b"example bytes"]

        # test data for the setup port
        initializer_data = [b"example init data"]

        # test values for parameters
        parameters = {"example_param": "example value"}

        # define the cube to be tested
        cube = ExampleCube()

        # creates the context manager, providing the cube and parameters to be tested
        with CubeTestRunner(cube, parameters=parameters) as runner:

            # send the test data to the cube
            # process() is called as a side effect
            runner.send_inputs(setup=initializer_data, intake=test_data)

            # verify the value of the parameter
            self.assertEqual(cube.args.example_param, "example value")

            # check the number of times data is emitted to the success port
            self.assertEqual(runner.outputs["success"].emit_count, 1)

            # verify the data sent to the success port
            self.assertEqual(list(runner.outputs["success"]), test_data)


if __name__ == "__main__":
    unittest.main()

Tests may be written without using a context manager, but more steps are required.

Writing a test without a context manager consists of the following:

  • Create an instance of a CubeTestRunner and provide it with the cube to be tested.

  • Call the start() method on the test runner. This sets up the cube and calls the begin() method on the cube.

  • Call the process() method on the cube, providing the correct data type and the name of the input port.

  • Call the finalize() method on the test runner. This calls the end() method on the cube.

Note

The start() method must be called for the test runner to work properly.

Any data emitted to output ports by the tested cube is made available as a Python list. This allows tests to check the size of each output stream, and read the items that were output.

Other methods that CubeTestRunner provides:

  • set_initializer_input to set an iterable as the input the cube should receive within the test. Must be called prior to start().

  • set_parameters a utility method that calls prepare on the values prior to setting the values on the cubes. Must be called prior to start().

Debugging Cubes

Floe enables the Python faulthandler which dumps tracebacks for segmentation faults.

Floe defines signal handlers for USR1 and USR2 which:

  • prints a traceback and starts a function profile upon an initial USR1 signal

  • collects/stops function profile after second USR1 >5 second later

  • prints garbage collection stats and starts gc leak debugging upon a single USR2 signal

  • prints stats, collects, and stops garbage collection leak debugging after second USR2 >5 second later

Cube Metrics

Beginning in Orion version 2020.1, cube metrics are available. Metrics for a cube may be enabled on a per cube basis by setting the cube_metrics parameter. The parameter value should contain a list of desired metrics. The periodicity of metric collection may be specified through the metric_period parameter as an integer (seconds).

The following metrics are available:

Cube Metrics

Name

Description

cpu

Total CPU utilization, as a percentage, as computed between two subsequent samples. Data is read from the Linux CPU accounting controller.

memory

Total memory usage, in bytes, as reported by the Linux process information pseudo-filesystem.

disk

Total filesystem usage, in bytes, for the filesystem available to the cube.

network

Total network I/O, in bytes, for the network device available to the cube.

gpu

GPU utilization, as a percentage, as reported by the NVIDIA driver.

Tuning Cube Communication

Running Cubes in Orion provides access to far more compute resources than typically available to an individual, and to enable this there are certain assumptions made about how to handle data passed between cubes. It is important to understand the parameters that provide the developer with the ability to tune these assumptions to best suit their scenario.

It is important to note that every cube uses buffers for sending and receiving data. A single downstream cube which is slow to process items can cause buffers on upstream cubes to fill up. Once those buffers are full, cubes will be blocked until items are consumed from them.

Serial Cubes

Serial cubes only have one system parameter that impacts their ability to stream results, which is the buffer_size parameter. The value of buffer_size specifies the size of the buffer, in bytes, that Floe keeps for messages that are processed and are to be sent to a downstream cube. If items can be processed faster than they can be sent to downstream cubes then the cube has to wait until the buffer has space to process more messages.

The default buffer_size value is a reasonable starting point for writing Cubes and it is not recommended to reduce it. Increasing the buffer_size value can increase the amount of work that can be done before waiting for a downstream cube to retrieve the data as well as increase the amount that can be transferred to the downstream cube. It is worth noting that this will consume more memory, so when running in Orion there may be a need to increase the cube’s memory_mb requirements as well. Often picking the value of buffer_size comes down to the specific use case, and may require some trial and error.

Note

Often the most performance gain is to be seen by serial cubes that handle large numbers of small messages. However, often it is advised to switch to a batch method like Files or ShardCollections in these scenarios.

Parallel Cubes

Parallel cubes have a system parameter that affects how many instances of the cube can run simultaneously and the effectiveness of each instance: item_count. The parameter value is the maximum of messages that are bundled up together and provided to each instance.

Note

Prior to the 2019.5 release the parameter prefetch_count was also used to control scaling. prefetch_count has been deprecated and is no longer honored.

When programs (such as cubes) exchange data over a network, there is a performance cost for each transfer. The total cost of all such transfers can dominate the cost of a calculation if item_count is chosen incorrectly. This is summarized with the following examples.

Suppose that a parallel cube performs a fast computation (ie Molecular weight) where each items is less than 1ms. If the overhead of sending a message (a set of items) is assumed to be 1ms, the overhead of sending messages can quickly become noticeable. Given the default values of item_count=1 and 1000 messages, each instance of the cube will receive 1 message to perform work on. This will result in there being 1ms spent moving the message, for an instance to spend ~1ms on the work. In this scenario up to 100% of the time can be consumed by passing messages. By increasing the item_count to a value that reduces the impact of the message passing overhead, the floe will become more responsive. For example, setting item_count=1000 would require sending only a single message, only taking 1001ms rather than potentially 2000ms. As the number of messages increase the overhead of passing messages will only become more noticeable and it is important to set values that provide enough work to each instance of a cube.

The second example is of a parallel cube performing a long and computationally expensive task (i.e. Molecular Dynamics) that takes an hour per item. Given the default values and 1000 messages, each instance of the cube will receive 10 messages to perform work on. Assuming that there are enough resources this would result in the max number of parallel cubes to be 100. This scenario is not ideal as each item delivered will require an hour and will run sequentially on a single worker, resulting in the minimal amount of time to perform the work to be 10 hours (each instance working for 10 hours). If the item_count is changed to a value of 1 then the max number of parallel cubes raises to 1000, with each instance only having to operate for an hour. This reduces the minimal time to an hour. This assumes perfect scaling, but it can easily be seen that less elapsed time can potentially be spent.

Exceptions

Floe contains a module for exceptions which trigger special behaviors in Orion. For now, there is only one, namely, floe.api.exceptions.HardwareError. When a cube running in Orion raises this error, it will cause the instance it is running on to stop(after finishing all existing work) and terminate.