Orion Client Python API Examples
Basics
Interactions with Orion resources are done through an OrionSession
,
which handles the authentication and network requests necessary to communicate
with the Orion API.
Retrieving Resources
OrionSession
provides two ways of retrieving resources, the first
is list_resources()
and other is get_resource()
.
list_resources()
provides objects that can be partially incomplete and
refresh_resource()
must be called on the object to ensure a complete object.
The following is an example demonstrating how objects can be incomplete.
# Import the pre-constructed session
from orionclient.session import APISession
from orionclient.types import WorkFloeSpec
for workfloe in APISession.list_resources(WorkFloeSpec):
# Verify that the specification is an empty dictionary
assert len(workfloe.specification) == 0
# Refresh the resource to ensure that the object has complete information
APISession.refresh_resource(workfloe)
assert len(workfloe.specification) != 0
break
When listing resources, you can provide filters to reduce the results that you get back.
# Import the pre-constructed session
from orionclient.session import APISession
from orionclient.types import WorkFloeSpec
filters = {"name": "omega"}
# Get all workfloes where the name contains "omega"
for workfloe in APISession.list_resources(WorkFloeSpec, filters=filters):
print(workfloe.name)
Links
Links are a way to link Orion resources to Data Records. Orion Client implements
the retrieval of these link objects. Links can be created to point at some Orion Client
Types (Datasets, Files, Collections, Shards and Secrets) using create_link()
to construct a link object that can be associated with a link field.
The following example is of creating a link to a record field in the case of Docking.
import time
from orionclient.types import Dataset
from drconvert import MolFileConverter
from orionclient.links import RecordLink
from orionclient.session import APISession
# Note: oechem must be imported before OpenEye toolkits
from openeye.oechem import OEMol, OEGraphMol
from datarecord import Types, OEField, OEPrimaryMolField, create_link
from openeye.oedocking import (
OEDock,
OEHybrid,
OEIsReceptor,
OEReceptorHasBoundLigand,
OEDockingReturnCode_Success,
)
# Upload the receptor dataset, as linked records must exist in
# Orion prior to linking
receptor_ds = Dataset.upload(APISession, "receptors", "rec1a28.oeb.gz")
# Wait until the receptor dataset is finalized
while receptor_ds.dirty:
APISession.refresh_resource(receptor_ds)
time.sleep(1)
# Retrieve the records for the ligands, in this case just one
records = []
for rec in MolFileConverter("1a28_flynn_refined_lig.sdf"):
records.append(rec)
mol_field = OEPrimaryMolField()
score_field = OEField("docking score", Types.Float)
# Field that will link to the receptor record
link_field = OEField("receptor link", Types.Link)
# Iterate through the receptors and ligands and
# set the score and link field.
for receptor_rec in receptor_ds.records():
receptor = receptor_rec.get_value(mol_field)
if receptor is None or not OEIsReceptor(receptor):
continue
receptor = OEGraphMol(receptor)
if OEReceptorHasBoundLigand(receptor):
dock = OEHybrid()
else:
dock = OEDock()
dock.Initialize(receptor)
for lig_rec in records:
ligand = lig_rec.get_value(mol_field)
if ligand is None:
continue
out_mol = OEMol()
if (
dock.DockMultiConformerMolecule(out_mol, ligand)
!= OEDockingReturnCode_Success
):
continue
score = out_mol.GetEnergy()
prev_score = lig_rec.get_value(score_field)
if prev_score is None or prev_score < score:
lig_rec.set_value(score_field, score)
# Set the link_field to point to the receptor field
lig_rec.set_value(
link_field, create_link(record=receptor_rec, field=mol_field)
)
for lig_rec in records:
link = lig_rec.get_value(link_field)
if link is None:
continue
# Use OrionClient to retrieve the link object
link_obj = APISession.get_link(link)
if not isinstance(link_obj, RecordLink):
continue
# Get the associated record
receptor_rec = link_obj.get_record()
# Retrieve the receptor from the linked record
receptor = receptor_rec.get_value(link_obj.get_field())
Download code
Miscellaneous
Converting File to Dataset using ETL Floe
The following example is of using the Python API to upload a file and run it through one of the ETL floes and get the output datasets
from time import sleep
from orionclient.session import APISession
from orionclient.types import File, Dataset, WorkFloeJob, WorkFloeSpec
# APISession uses the default profile, or the profile name defined in
# the ORION_PROFILE environment variable.
session = APISession
# Upload the file to run an ETL floe against
file_upload = File.upload(session, "File Name", "/path/to/file.sdf")
# Filter for Datasets tagged with ETL
workfloe_filters = {"tags": "ETL"}
etl_floes = []
for resource in session.list_resources(WorkFloeSpec, filters=workfloe_filters):
etl_floes.append(resource)
if len(etl_floes) < 1:
raise RuntimeError("Unable to find an ETL floe")
# Start a job on the first etl floe found
job = WorkFloeJob.start(
session,
etl_floes[0],
"Example Floe Name",
{"promoted": {"file": file_upload.id, "dataset": "Output Dataset Name"}},
)
# Poll the job to check when it is complete
while job.state != "complete":
# Refresh the job from the API
session.refresh_resource(job)
sleep(1)
if len(job.reason):
raise RuntimeError(f"Job failed with reason {job.reason}")
# Retrieve Dataset from Job by tag
dataset_filters = {"tags": f"Job {job.id}"}
datasets = []
for resource in session.list_resources(Dataset, filters=dataset_filters):
datasets.append(resource)
Download code
Upload File in parts
Uploading a file in parts is best practice when disk space is limited and the file can be written as a stream.
Warning
The first part must be at least 5MB.
import os
from argparse import ArgumentParser
from orionclient.types import File
from orionclient.session import APISession
# Part size must be at least 5 MB, except for the final part
PART_SIZE = 5 * 1024 * 1024
def upload_multi_part_file(path: str):
file_name = os.path.basename(path)
multi_part = File.create_multipart_file(APISession, file_name)
with open(path, "rb") as ifs:
data = ifs.read(PART_SIZE)
while data:
# Upload parts
multi_part.upload_part(data)
data = ifs.read(PART_SIZE)
# Complete the multi part upload
multi_part.complete()
print(f"Uploaded {path} to File {multi_part.id}")
if __name__ == "__main__":
parser = ArgumentParser(description="Upload file in parts")
parser.add_argument("filepath")
args = parser.parse_args()
upload_multi_part_file(args.filepath)
Download code
Triggering Workfloes from local molecular files
The following examples demonstrates parsing the workfloe specification of a floe to provide the inputs and outputs necessary for starting a workfloe locally.
import os
import sys
from argparse import ArgumentParser
from orionclient.session import APISession
from orionclient.types import Dataset, WorkFloeJob, WorkFloeSpec
from floe.constants import PROMOTED, SINK_CUBE, SOURCE_CUBE, PARAMETER_TYPE
from orionplatform.constants import ( # DATASET_IN only exists in orion-platform>=1.0.1
DATASET_IN,
DATASET_OUT,
)
def run_workfloe(workfloe, output_name, input_param, output_params, datasets):
orion_datasets = []
for dataset in datasets:
orion_datasets.append(
Dataset.upload(APISession, os.path.basename(dataset), dataset)
)
kwargs = {
"promoted": {
input_param["promoted_name"]: {
"datasets": [{"id": ds.id} for ds in orion_datasets]
}
}
}
for cube, param in output_params:
kwargs["promoted"][param["promoted_name"]] = f"{output_name} - {cube['name']}"
job = WorkFloeJob.start(APISession, workfloe, f"{workfloe.name}", kwargs)
return job
def parse_workfloe_specification(specification):
"""Parses out the input dataset parameter and the output dataset parameters
:param dict specification: Workfloe Specification
:return: (dict, [(cube, dict)])
"""
params = [
(cube, parm)
for cube in specification["cubes"]
for parm in cube["parameters"]
if cube["type"] in (SOURCE_CUBE, SINK_CUBE)
]
dataset_input = None
dataset_output = []
for cube, param in params:
if param[PARAMETER_TYPE] == DATASET_IN and param[PROMOTED]:
if dataset_input is not None:
raise RuntimeError(
"Found more than one dataset input parameter, unable to proceed."
)
dataset_input = param
elif param[PARAMETER_TYPE] == DATASET_OUT and param[PROMOTED]:
dataset_output.append((cube, param))
if dataset_input is None:
raise RuntimeError(
"Unable to find a promoted dataset input parameter, exiting.."
)
if not len(dataset_output):
raise RuntimeError(
"Unable to find a promoted dataset output parameter, exiting.."
)
return dataset_input, dataset_output
def main():
parser = ArgumentParser(description="Start a floe from local datasets")
parser.add_argument("workfloe", type=int, help="ID of the workfloe to run")
parser.add_argument("output_name", type=str, help="Name of the dataset to output")
parser.add_argument(
"molecule_files",
nargs="+",
help="List of molecular files to use as input to the floe",
)
parser.add_argument(
"--split",
action="store_true",
help="Indicates to run one floe per molecular file, rather than one floe with multiple molecular files as input",
)
args = parser.parse_args()
workfloe = APISession.get_resource(WorkFloeSpec, args.workfloe)
try:
input_param, output_param = parse_workfloe_specification(workfloe.specification)
except RuntimeError as e:
print(str(e))
sys.exit(1)
if args.split:
for dataset in args.datasets:
job = run_workfloe(
workfloe, args.output_name, input_param, output_param, [dataset]
)
print(f"Started job {job.id}")
else:
job = run_workfloe(
workfloe, args.output_name, input_param, output_param, args.datasets
)
print(f"Started job {job.id}")
if __name__ == "__main__":
main()
Download code
Using ShardCollections
See also
Orion Types Collections and Shards section
The following example uses the Python API to create a collection, upload a file as a shard, mark a shard ready, and close a collection
import os
from tempfile import NamedTemporaryFile
from orionclient.session import APISession
from orionclient.types import Shard, ShardCollection
# APISession uses the default profile, or the profile name defined in
# the ORION_PROFILE environment variable.
session = APISession
# create a new collection
collection = ShardCollection.create(
session, "Descriptive name of new shard collection", metadata={"size_bytes": 0}
)
print(
f"Collection '{collection.name}' can be referenced later using id={collection.id}"
)
for i in range(10):
with NamedTemporaryFile("w+t") as temp:
temp.write(f"data{i}")
temp.flush()
# create a new shard
shard = Shard.create(
collection,
f"Descriptive name of shard {i}",
metadata={"size_bytes": os.stat(temp.name).st_size},
)
print(
f"Shard {i} in collection {collection.id} can be referenced later using id={shard.id}"
)
# upload a file to the newly created shard
# file size of individual shard is currently limited to 5 GB
shard.upload_file(temp.name)
# by default, a shard is in an open state
# a shard changes to the temporary state when data has been uploaded
# shards can be marked ready immediately after upload in a serial cube,
# however shards should be marked ready downstream of upload in a parallel cube
shard.close()
metadata = {
"size_bytes": collection.metadata["size_bytes"]
+ shard.metadata["size_bytes"]
}
# Updates collection in place
collection.update(metadata=metadata)
# prevent any additional shards from being added to the collection
collection.close()
Download code
The following example uses the Python API to open an existing collection, download all shards in the collection to local files, and delete a collection and its shards
import os
import sys
from orionclient.session import APISession
from orionclient.types import ShardCollection
# APISession uses the default profile, or the profile name defined in
# the ORION_PROFILE environment variable.
session = APISession
# a collection can be retrieved directly by id
# collection = session.get_resource(ShardCollection, collection_id)
# retrieving a shard by id is a little more complicated
# shard = session.get_resource(Shard(collection=collection), shard_id)
# or
# shard = Shard.from_msg({"id": shard_id, "collection": collection_id}, session=APISession)
# session.refresh_resource(shard)
collection = None
# find a collection by name
# note that names are not guaranteed to be unique
for c in session.list_resources(ShardCollection):
if c.name == "Descriptive name of new shard collection":
collection = c
break
if collection is None:
print("Unable to find a collection to delete")
sys.exit()
for shard in collection.list_shards():
filename = f"collection_{collection.name}_{collection.id}_shard_{shard.id}"
shard.download_to_file(filename)
# a shard's data can also be streamed
bytes_read = 0
for chunk in shard.download():
bytes_read += len(chunk)
print(f"file '{filename}' bytes_read={bytes_read}")
os.remove(filename)
# delete a collection of shards
if session.delete_resource(collection):
print(f"collection '{collection.name}' deleted")
Download code
Benchmarking Workfloes
See also
Parameterization Helpers section
The following example demonstrates how to use the Python API helper to benchmark
a workfloe. This example shows benchmarking the Classic OMEGA
floe in Snowball,
using two different parameters item_count
, which affects parallel cubes and buffer_size
which affects serial cubes and their ability to stream data quickly. In other floes there may be parameters
specific to individual cubes that will determine the time and cost of floes, and
should be benchmarked accordingly.
import sys
import json
from datetime import datetime
from argparse import ArgumentParser
from orionclient.session import APISession
from orionclient.exceptions import OrionError
from orionclient.types import Dataset, WorkFloeSpec
from orionclient.helpers.parameterize import parameterize_workfloe
WORKFLOE_TITLE = "Classic OMEGA"
def run_benchmarks(args):
try:
dataset = APISession.get_resource(Dataset, args.dataset_id)
except OrionError as e:
print(e)
sys.exit(1)
filters = {"title": WORKFLOE_TITLE}
workfloes = list(APISession.list_resources(WorkFloeSpec, filters=filters))
if not len(workfloes):
print(f"Unable to find a workfloe named {WORKFLOE_TITLE}")
sys.exit(1)
workfloe = workfloes[0]
# Refresh to get the specification
APISession.refresh_resource(workfloe)
print(f"Benchmarking Workfloe {workfloe.title}, id:{workfloe.id:d}")
default_params = {"promoted": {"in": dataset.id, "out": args.output_dataset}}
# Get the cubes in the specification and get default values for parameter
# being benchmarked
cubes = workfloe.specification["cubes"]
default_values = {}
for cube in cubes:
for param in cube["parameters"]:
if param["name"] == args.benchmark:
default_values[cube["name"]] = {
"default": param["default"],
"max_value": param["max_value"],
}
parameters = []
for x in range(args.steps):
cube_params = {}
for key, item in default_values.items():
cube_params[key] = {args.benchmark: item["default"] * (x + 1)}
if item["max_value"] is not None:
cube_params[key][args.benchmark] = min(
cube_params[key][args.benchmark], item["max_value"]
)
parameters.append({"cube": cube_params})
jobs = parameterize_workfloe(
workfloe,
f"{WORKFLOE_TITLE} Benchmark",
default_params,
parameters,
parallel=args.parallel,
)
for job in jobs:
header = f"Job {job.id}"
print(header)
print("-" * len(header))
print(json.dumps(job.parameters, indent=2))
started = datetime.strptime(job.started, "%Y-%m-%d %H:%M")
ended = datetime.strptime(job.finished, "%Y-%m-%d %H:%M")
print(f"Took {ended - started}")
if not job.success:
print(f"Failure - {job.reason}")
else:
print("Success")
def main():
parser = ArgumentParser(description=f"Benchmark {WORKFLOE_TITLE}")
parser.add_argument("dataset_id")
parser.add_argument("output_dataset")
parser.add_argument(
"--benchmark",
default="buffer_size",
choices=["buffer_size", "item_count"],
help="Cube parameter to benchmark",
)
parser.add_argument(
"--steps",
default=5,
type=int,
help="Number of steps to run, each step is default times the step number",
)
parser.add_argument(
"--parallel", action="store_true", help="Run benchmarking floes in parallel"
)
args = parser.parse_args()
run_benchmarks(args)
if __name__ == "__main__":
main()
Download code