Orion Client Python API Examples

Basics

Interactions with Orion resources are done through an OrionSession, which handles the authentication and network requests necessary to communicate with the Orion API.

Retrieving Resources

OrionSession provides two ways of retrieving resources, the first is list_resources() and other is get_resource(). list_resources() provides objects that can be partially incomplete and refresh_resource() must be called on the object to ensure a complete object. The following is an example demonstrating how objects can be incomplete.

# Import the pre-constructed session
from orionclient.session import APISession
from orionclient.types import WorkFloeSpec

for workfloe in APISession.list_resources(WorkFloeSpec):
    # Verify that the specification is an empty dictionary
    assert len(workfloe.specification) == 0
    # Refresh the resource to ensure that the object has complete information
    APISession.refresh_resource(workfloe)
    assert len(workfloe.specification) != 0
    break

When listing resources, you can provide filters to reduce the results that you get back.

# Import the pre-constructed session
from orionclient.session import APISession
from orionclient.types import WorkFloeSpec

filters = {"name": "omega"}
# Get all workfloes where the name contains "omega"
for workfloe in APISession.list_resources(WorkFloeSpec, filters=filters):
    print(workfloe.name)

Miscellaneous

Converting File to Dataset using ETL Floe

The following example is of using the Python API to upload a file and run it through one of the ETL floes and get the output datasets

from time import sleep

from orionclient.session import APISession
from orionclient.types import File, Dataset, WorkFloeJob, WorkFloeSpec

# APISession uses the default profile, or the profile name defined in
# the ORION_PROFILE environment variable.
session = APISession

# Upload the file to run an ETL floe against
file_upload = File.upload(session, "File Name", "/path/to/file.sdf")

# Filter for Datasets tagged with ETL
workfloe_filters = {"tags": "ETL"}
etl_floes = []
for resource in session.list_resources(WorkFloeSpec, filters=workfloe_filters):
    etl_floes.append(resource)

if len(etl_floes) < 1:
    raise RuntimeError("Unable to find an ETL floe")

# Start a job on the first etl floe found
job = WorkFloeJob.start(
    session,
    etl_floes[0],
    "Example Floe Name",
    {"promoted": {"file": file_upload.id, "dataset": "Output Dataset Name"}},
)
# Poll the job to check when it is complete
while job.state != "complete":
    # Refresh the job from the API
    session.refresh_resource(job)
    sleep(1)

if len(job.reason):
    raise RuntimeError("Job failed with reason {}".format(job.reason))

# Retrieve Dataset from Job by tag
dataset_filters = {"tags": "Job {}".format(job.id)}
datasets = []
for resource in session.list_resources(Dataset, filters=dataset_filters):
    datasets.append(resource)

Download code

run-etl-floe.py

Upload File in parts

Uploading a file in parts is best practice when disk space is limited and the file can be written as a stream.

Warning

The first part must be at least 5MB.

import os
from argparse import ArgumentParser

from orionclient.types import File
from orionclient.session import APISession

# Part size must be at least 5 MB, except for the final part
PART_SIZE = 5 * 1024 * 1024


def upload_multi_part_file(path: str):
    file_name = os.path.basename(path)
    multi_part = File.create_multipart_file(APISession, file_name)
    with open(path, "rb") as ifs:
        data = ifs.read(PART_SIZE)
        while data:
            # Upload parts
            multi_part.upload_part(data)
            data = ifs.read(PART_SIZE)
    # Complete the multi part upload
    multi_part.complete()
    print(f"Uploaded {path} to File {multi_part.id}")


if __name__ == "__main__":
    parser = ArgumentParser(description="Upload file in parts")
    parser.add_argument("filepath")
    args = parser.parse_args()
    upload_multi_part_file(args.filepath)

Download code

multi_part_file.py

Triggering Workfloes from local molecular files

The following examples demonstrates parsing the workfloe specification of a floe to provide the inputs and outputs necessary for starting a workfloe locally.

import os
import sys
from argparse import ArgumentParser

from orionclient.session import APISession
from orionclient.types import Dataset, WorkFloeJob, WorkFloeSpec
from floe.constants import PROMOTED, SINK_CUBE, SOURCE_CUBE, PARAMETER_TYPE
from orionplatform.constants import (  # DATASET_IN only exists in orion-platform>=1.0.1
    DATASET_IN,
    DATASET_OUT,
)


def run_workfloe(workfloe, output_name, input_param, output_params, datasets):
    orion_datasets = []
    for dataset in datasets:
        orion_datasets.append(
            Dataset.upload(APISession, os.path.basename(dataset), dataset)
        )
    kwargs = {
        "promoted": {
            input_param["promoted_name"]: {
                "datasets": [{"id": ds.id} for ds in orion_datasets]
            }
        }
    }
    for cube, param in output_params:
        kwargs["promoted"][param["promoted_name"]] = "{} - {}".format(
            output_name, cube["name"]
        )
    job = WorkFloeJob.start(APISession, workfloe, "{}".format(workfloe.name), kwargs)
    return job


def parse_workfloe_specification(specification):
    """Parses out the input dataset parameter and the output dataset parameters

    :param dict specification: Workfloe Specification
    :return: (dict, [(cube, dict)])
    """
    params = [
        (cube, parm)
        for cube in specification["cubes"]
        for parm in cube["parameters"]
        if cube["type"] in (SOURCE_CUBE, SINK_CUBE)
    ]
    dataset_input = None
    dataset_output = []
    for cube, param in params:
        if param[PARAMETER_TYPE] == DATASET_IN and param[PROMOTED]:
            if dataset_input is not None:
                raise RuntimeError(
                    "Found more than one dataset input parameter, unable to proceed."
                )
            dataset_input = param
        elif param[PARAMETER_TYPE] == DATASET_OUT and param[PROMOTED]:
            dataset_output.append((cube, param))
    if dataset_input is None:
        raise RuntimeError(
            "Unable to find a promoted dataset input parameter, exiting.."
        )
    if not len(dataset_output):
        raise RuntimeError(
            "Unable to find a promoted dataset output parameter, exiting.."
        )
    return dataset_input, dataset_output


def main():
    parser = ArgumentParser(description="Start a floe from local datasets")
    parser.add_argument("workfloe", type=int, help="ID of the workfloe to run")
    parser.add_argument("output_name", type=str, help="Name of the dataset to output")
    parser.add_argument(
        "molecule_files",
        nargs="+",
        help="List of molecular files to use as input to the floe",
    )
    parser.add_argument(
        "--split",
        action="store_true",
        help="Indicates to run one floe per molecular file, rather than one floe with multiple molecular files as input",
    )
    args = parser.parse_args()
    workfloe = APISession.get_resource(WorkFloeSpec, args.workfloe)
    try:
        input_param, output_param = parse_workfloe_specification(workfloe.specification)
    except RuntimeError as e:
        print(str(e))
        sys.exit(1)
    if args.split:
        for dataset in args.datasets:
            job = run_workfloe(
                workfloe, args.output_name, input_param, output_param, [dataset]
            )
            print("Started job {}".format(job.id))
    else:
        job = run_workfloe(
            workfloe, args.output_name, input_param, output_param, args.datasets
        )
        print("Started job {}".format(job.id))


if __name__ == "__main__":
    main()

Download code

run-dataset-floe.py

Using ShardCollections

See also

Orion Types Collections and Shards section

The following example uses the Python API to create a collection, upload a file as a shard, mark a shard ready, and close a collection

import os
from tempfile import NamedTemporaryFile

from orionclient.session import APISession
from orionclient.types import Shard, ShardCollection

# APISession uses the default profile, or the profile name defined in
# the ORION_PROFILE environment variable.
session = APISession

# create a new collection
collection = ShardCollection.create(
    session, "Descriptive name of new shard collection", metadata={"size_bytes": 0}
)
print(
    "Collection '{}' can be referenced later using id={}".format(
        collection.name, collection.id
    )
)

for i in range(10):
    with NamedTemporaryFile("w+t") as temp:
        temp.write("data{}".format(i))
        temp.flush()

        # create a new shard
        shard = Shard.create(
            collection,
            "Descriptive name of shard {}".format(i),
            metadata={"size_bytes": os.stat(temp.name).st_size},
        )
        print(
            "Shard {} in collection {} can be referenced later using id={}".format(
                i, collection.id, shard.id
            )
        )

        # upload a file to the newly created shard
        # file size of individual shard is currently limited to 5 GB
        shard.upload_file(temp.name)

        # by default, a shard is in an open state
        # a shard changes to the temporary state when data has been uploaded
        # shards can be marked ready immediately after upload in a serial cube,
        # however shards should be marked ready downstream of upload in a parallel cube
        shard.close()

        metadata = {
            "size_bytes": collection.metadata["size_bytes"]
            + shard.metadata["size_bytes"]
        }
        # Updates collection in place
        collection.update(metadata=metadata)

# prevent any additional shards from being added to the collection
collection.close()

The following example uses the Python API to open an existing collection, download all shards in the collection to local files, and delete a collection and its shards

import os
import sys

from orionclient.session import APISession
from orionclient.types import ShardCollection

# APISession uses the default profile, or the profile name defined in
# the ORION_PROFILE environment variable.
session = APISession

# a collection can be retrieved directly by id
# collection = session.get_resource(ShardCollection, collection_id)

# retrieving a shard by id is a little more complicated
# shard = session.get_resource(Shard(collection=collection), shard_id)
# or
# shard = Shard.from_msg({"id": shard_id, "collection": collection_id}, session=APISession)
# session.refresh_resource(shard)

collection = None

# find a collection by name
# note that names are not guaranteed to be unique
for c in session.list_resources(ShardCollection):
    if c.name == "Descriptive name of new shard collection":
        collection = c
        break

if collection is None:
    print("Unable to find a collection to delete")
    sys.exit()

for shard in collection.list_shards():
    filename = "collection_{}_{}_shard_{}".format(
        collection.name, collection.id, shard.id
    )
    shard.download_to_file(filename)

    # a shard's data can also be streamed
    bytes_read = 0
    for chunk in shard.download():
        bytes_read += len(chunk)
        print("file '{}' bytes_read={}".format(filename, bytes_read))

    os.remove(filename)

# delete a collection of shards
if session.delete_resource(collection):
    print("collection '{}' deleted".format(collection.name))

Benchmarking Workfloes

See also

Parameterization Helpers section

The following example demonstrates how to use the Python API helper to benchmark a workfloe. This example shows benchmarking the Classic OMEGA floe in Snowball, using two different parameters item_count, which affects parallel cubes and buffer_size which affects serial cubes and their ability to stream data quickly. In other floes there may be parameters specific to individual cubes that will determine the time and cost of floes, and should be benchmarked accordingly.

import sys
import json
from datetime import datetime
from argparse import ArgumentParser

from orionclient.session import APISession
from orionclient.exceptions import OrionError
from orionclient.types import Dataset, WorkFloeSpec
from orionclient.helpers.parameterize import parameterize_workfloe

WORKFLOE_TITLE = "Classic OMEGA"


def run_benchmarks(args):
    try:
        dataset = APISession.get_resource(Dataset, args.dataset_id)
    except OrionError as e:
        print(e)
        sys.exit(1)
    filters = {"title": WORKFLOE_TITLE}
    workfloes = list(APISession.list_resources(WorkFloeSpec, filters=filters))
    if not len(workfloes):
        print("Unable to find a workfloe named {}".format(WORKFLOE_TITLE))
        sys.exit(1)
    workfloe = workfloes[0]
    # Refresh to get the specification
    APISession.refresh_resource(workfloe)
    print("Benchmarking Workfloe {}, id:{:d}".format(workfloe.title, workfloe.id))
    default_params = {"promoted": {"in": dataset.id, "out": args.output_dataset}}
    # Get the cubes in the specification and get default values for parameter
    # being benchmarked
    cubes = workfloe.specification["cubes"]
    default_values = {}
    for cube in cubes:
        for param in cube["parameters"]:
            if param["name"] == args.benchmark:
                default_values[cube["name"]] = {
                    "default": param["default"],
                    "max_value": param["max_value"],
                }
    parameters = []
    for x in range(args.steps):
        cube_params = {}
        for key, item in default_values.items():
            cube_params[key] = {args.benchmark: item["default"] * (x + 1)}
            if item["max_value"] is not None:
                cube_params[key][args.benchmark] = min(
                    cube_params[key][args.benchmark], item["max_value"]
                )
        parameters.append({"cube": cube_params})
    jobs = parameterize_workfloe(
        workfloe,
        "{} Benchmark".format(WORKFLOE_TITLE),
        default_params,
        parameters,
        parallel=args.parallel,
    )
    for job in jobs:
        header = "Job {}".format(job.id)
        print(header)
        print("-" * len(header))
        print(json.dumps(job.parameters, indent=2))
        started = datetime.strptime(job.started, "%Y-%m-%d %H:%M")
        ended = datetime.strptime(job.finished, "%Y-%m-%d %H:%M")
        print("Took {}".format(ended - started))
        if not job.success:
            print("Failure - {}".format(job.reason))
        else:
            print("Success")


def main():
    parser = ArgumentParser(description="Benchmark {}".format(WORKFLOE_TITLE))
    parser.add_argument("dataset_id")
    parser.add_argument("output_dataset")
    parser.add_argument(
        "--benchmark",
        default="buffer_size",
        choices=["buffer_size", "item_count"],
        help="Cube parameter to benchmark",
    )
    parser.add_argument(
        "--steps",
        default=5,
        type=int,
        help="Number of steps to run, each step is default times the step number",
    )
    parser.add_argument(
        "--parallel", action="store_true", help="Run benchmarking floes in parallel"
    )
    args = parser.parse_args()
    run_benchmarks(args)


if __name__ == "__main__":
    main()

Download code

benchmark_workfloe.py

Share Jobs with Project Members

import sys

from orionclient.types import WorkFloeJob
from orionclient.exceptions import OrionError
from orionclient.session import DEFAULT_PROFILE, OrionSession, get_profile_config


def share_job_to_project(session, job):
    project = session.get_current_project()
    try:
        session.share_resource(job, project)
    except OrionError:
        print(
            f"Unable to share job {job.id} with project (perhaps you already shared it?)"
        )


if __name__ == "__main__":
    job_ID = int(sys.argv[1])
    project_id = int(sys.argv[2])

    config = get_profile_config(profile=DEFAULT_PROFILE)
    config.project = project_id

    curr_session = OrionSession(config=config)
    job = curr_session.get_resource(WorkFloeJob, job_ID)

    share_job_to_project(curr_session, job)

where the session is a configured Orion session and the job is a WorkfloeJob.

This script shares the job with project members.