Port Basics
I/O Ports
Ports are used to pass records in and out of a cube. The mixin orionplatform.mixins.RecordPortsMixin
automatically
includes an input port ‘intake’, and two output ports ‘success’ and ‘failure’. Cubes can have additional input
and output ports, or completely different port configurations. To add ports to a cube we use the following two
classes.
orionplatform.ports.RecordInputPort
: Input port for records
orionplatform.ports.RecordOutputPort
: Output port for records
To add a port to a class one of the above classes must be added as a class member to the cube. Like parameters the first argument sent to the constructor must always be the class variable this object is assigned to.
In the following example we add a ‘missing’ port to a compute explicit hydrogens cube where we will output records that are missing a molecule to process.
from floe.api import ComputeCube
from orionplatform.ports import RecordOutputPort
from orionplatform.mixins import RecordPortsMixin
from orionplatform.parameters import PrimaryMolFieldParameter
# Note: oechem must be imported before OpenEye toolkits
from openeye.oechem import OEAddExplicitHydrogens
class SquareIntCube(RecordPortsMixin, ComputeCube):
missing = RecordOutputPort("missing")
in_mol_field = PrimaryMolFieldParameter("in_mol_field")
out_mol_field = PrimaryMolFieldParameter("out_mol_field")
def process(self, record, port):
if record.has_value(self.args.in_mol_field):
mol = record.get_value(self.args.in_mol_field)
OEAddExplicitHydrogens(mol)
record.set_value(self.args.out_mol_field, mol)
self.success.emit(record)
else:
self.missing.emit(record)
In the above example we now have a ‘failure’ port that is never used. To create a cube with a completely custom port
configuration we should only derive from floe.api.cubes.ComputeCube
which defines no ports.
The following example demonstrates this for a simple switch cube.
from floe.api import ComputeCube, BooleanParameter
from orionplatform.ports import RecordInputPort, RecordOutputPort
class SquareIntCube(ComputeCube):
intake = RecordInputPort("intake")
true = RecordOutputPort("true")
false = RecordOutputPort("false")
switch = BooleanParameter(
"switch",
required=True,
title="Switch",
description="Controls whether the record is sent to the 'true' or 'false' port",
)
def process(self, record, port):
if self.args.switch:
self.true.emit(record)
else:
self.false.emit(record)
Initialization Ports
Initialization ports are specialized types of input ports for passing setup data to the cube. Data sent to an initialization port is guaranteed to reach the cube before data sent to a non-initialization port. Additionally, data sent to initialization ports is not sent to the cube’s process function, but is rather made available exactly once from any cube method as a Python generator. Generally, initialization data is consumed in the begin method.
Note
Initialization ports may be used only with ComputeCube objects, not with SourceCube objects.
The following cube that counts the number of records sent to the initialization and normal input port demonstrates how to access initialization records in the begin cube.
from floe.api import ComputeCube
from orionplatform.ports import RecordInputPort, RecordOutputPort
class InitCountCube(ComputeCube):
init = RecordInputPort("init", initializer=True)
intake = RecordInputPort("intake")
success = RecordOutputPort("success")
def begin(self):
# Initialize intake counter
self.init_count = 0
self.intake_counter = 0
# Loop over records sent to the initialization port
init_count = 0
for record in self.init:
init_count += 1
self.log.info(f"{init_count} records sent to the init port")
def process(self, record, port):
if port != "intake":
raise RuntimeError(
"This will never happen. records sent to the init port are not sent to process"
)
self.intake_count += 1
self.success.emit(record)
def end(self):
self.log.info(f"{self.init_count} records sent to the intake port")
Note
Data sent to initialization ports may be consumed exactly once, generally in the begin method.
The following cube and floe demonstrate how to send an entire file through an initialization port.
from os import remove
from utils import TemporaryPath
from floe.api.floes import WorkFloe
from floe.api.cubes import ComputeCube
from floe.api.ports import BinaryInputPort
from orionplatform.mixins import RecordPortsMixin
from orionplatform.cubes import DatasetReaderCube, BinaryFileReaderCube
class InitFileCube(RecordPortsMixin, ComputeCube):
setup = BinaryInputPort("setup", required=True, initializer=True)
def begin(self):
self.init_data = TemporaryPath(suffix=".initdata")
with open(self.init_data.path, "wb") as ofs:
for data in self.setup:
ofs.write(data)
def process(self, record, port):
self.log("Launch app with data from initialization port for each record")
def end(self):
remove(self.init_data.path)
job = WorkFloe("Example File Initializer")
setup_source = BinaryFileReaderCube("setup_source")
data_source = DatasetReaderCube("data_source")
init_file = InitFileCube("init_file")
job.add_cubes(setup_source, data_source, init_file)
setup_source.success.connect(init_file.setup)
setup_source.promote_parameter("file")
data_source.success.connect(init_file.intake)
data_source.promote_parameter("data_in")
if __name__ == "__main__":
job.run()
Warning
Parallel To Serial Initializer Cube Deadlock Error
When an initializer port on a downstream serial cube is directly connected to an upstream parallel cube, if there exists another path of ports between the parallel cube and the serial cube that does not pass through the aforementioned initializer port, then deadlock will occur.
Example 1:
─► means initializer port
Floe will deadlock
In the above scenario, there exist two paths from Parallel Cube 2 to Serial Cube 4. Path A is a direct connection from Parallel Cube 2 to Serial Cube 4 via an initializer port, and Path B also connects from Parallel Cube 2 to Serial Cube 4. Path B may be comprised of any number and types of ports, as long as it connects from the upstream parallel cube to the downstream serial cube.
Solution
A simple fix to prevent deadlock from occuring in the above scenario is to insert a pass-through cube in Path A so the initializer port does not directly connect to the upstream parallel cube. The pass through cube may even utilize an initializer port of its own, as long as there is not an additional path from the upstream parallel cube to the pass through cube
Example 2:
─► means initializer port
Floe will not deadlock