Port Basics

I/O Ports

Ports are used to pass records in and out of a cube. The mixin orionplatform.mixins.RecordPortsMixin automatically includes an input port ‘intake’, and two output ports ‘success’ and ‘failure’. Cubes can have additional input and output ports, or completely different port configurations. To add ports to a cube we use the following two classes.

To add a port to a class one of the above classes must be added as a class member to the cube. Like parameters the first argument sent to the constructor must always be the class variable this object is assigned to.

In the following example we add a ‘missing’ port to a compute explicit hydrogens cube where we will output records that are missing a molecule to process.

from floe.api import ComputeCube
from orionplatform.ports import RecordOutputPort
from orionplatform.mixins import RecordPortsMixin
from orionplatform.parameters import PrimaryMolFieldParameter

from openeye.oechem import OEAddExplicitHydrogens


class SquareIntCube(RecordPortsMixin, ComputeCube):
    missing = RecordOutputPort("missing")

    in_mol_field = PrimaryMolFieldParameter("in_mol_field")
    out_mol_field = PrimaryMolFieldParameter("out_mol_field")

    def process(self, record, port):
        if record.has_value(self.args.in_mol_field):
            mol = record.get_value(self.args.in_mol_field)
            OEAddExplicitHydrogens(mol)
            record.set_value(self.args.out_mol_field, mol)
            self.success.emit(record)
        else:
            self.missing.emit(record)

In the above example we now have a ‘failure’ port that is never used. To create a cube with a completely custom port configuration we should only derive from floe.api.cubes.ComputeCube which defines no ports. The following example demonstrates this for a simple switch cube.

from floe.api import ComputeCube, BooleanParameter
from orionplatform.ports import RecordInputPort, RecordOutputPort


class SquareIntCube(ComputeCube):
    intake = RecordInputPort("intake")
    true = RecordOutputPort("true")
    false = RecordOutputPort("false")

    switch = BooleanParameter(
        "switch",
        required=True,
        title="Switch",
        description="Controls whether the record is sent to the 'true' or 'false' port",
    )

    def process(self, record, port):
        if self.args.switch:
            self.true.emit(record)
        else:
            self.false.emit(record)

Initialization Ports

Initialization ports are specialized types of input ports for passing setup data to the cube. Data sent to an initialization port is guaranteed to reach the cube before data sent to a non-initialization port. Additionally, data sent to initialization ports is not sent to the cube’s process function, but is rather made available exactly once from any cube method as a Python generator. Generally, initialization data is consumed in the begin method.

The following cube that counts the number of records sent to the initialization and normal input port demonstrates how to access initialization records in the begin cube.

from floe.api import ComputeCube
from orionplatform.ports import RecordInputPort, RecordOutputPort


class InitCountCube(ComputeCube):
    init = RecordInputPort("init", initializer=True)
    intake = RecordInputPort("intake")
    success = RecordOutputPort("success")

    def begin(self):
        # Initialize intake counter
        self.init_count = 0
        self.intake_counter = 0

        #  Loop over records sent to the initialization port
        init_count = 0
        for record in self.init:
            init_count += 1
        self.log.info("{} records sent to the init port".format(init_count))

    def process(self, record, port):
        if port != "intake":
            raise RuntimeError(
                "This will never happen. records sent to the init port are not sent to process"
            )

        self.intake_count += 1
        self.success.emit(record)

    def end(self):
        self.log.info("{} records sent to the intake port".format(self.init_count))

Note

Data sent to initialization ports may be consumed exactly once, generally in the begin method.

The following cube and floe demonstrate how to send an entire file through an initialization port.

from os import remove

from floe.api.floes import WorkFloe
from floe.api.cubes import ComputeCube
from floe.api.ports import BinaryInputPort

from orionclient.utils import TemporaryPath

from orionplatform.mixins import RecordPortsMixin
from orionplatform.cubes import BinaryFileReaderCube, DatasetReaderCube


class InitFileCube(RecordPortsMixin, ComputeCube):

    setup = BinaryInputPort("setup", required=True, initializer=True)

    def begin(self):
        self.init_data = TemporaryPath(suffix=".initdata")
        with open(self.init_data.path, "wb") as ofs:
            for data in self.setup:
                ofs.write(data)

    def process(self, record, port):
        self.log("Launch app with data from initialization port for each record")

    def end(self):
        remove(self.init_data.path)


job = WorkFloe("Example File Initializer")
setup_source = BinaryFileReaderCube("setup_source")
data_source = DatasetReaderCube("data_source")
init_file = InitFileCube("init_file")

job.add_cubes(setup_source, data_source, init_file)

setup_source.success.connect(init_file.setup)
setup_source.promote_parameter("file")

data_source.success.connect(init_file.intake)
data_source.promote_parameter("data_in")


if __name__ == "__main__":
    job.run()