Tutorial: Creating a Tensorflow Package

This tutorial will walk you through creating a complex Orion package with conda dependencies and GPU requirements.

Creating the initial package

Begin by creating a basic package using the init command; for more details follow the instructions here.

This tutorial assumes that you are using ocli. However, if you already created a package manually, the information should transfer relatively easily.

$ ocli packages init
Author's full name [John Doe]:
Email [john.doe@my_org.com]:
Project name eg. My Orion Package [My Orion Package]: My Tensorflow Tutorial
Project slug [my-orion-package]: my-tensorflow-package
Module name [my_orion_package]: my_tensorflow_package
Description [Floe package]:
Version [0.1.0]:

Configuring Conda Requirements

The configuration of Orion packages, including conda dependencies, is controlled through a manifest.json file that lives at the top level of the directory. In this tutorial, our package requires tensorflow-gpu==2.4.1 from the anaconda channel.

Note

Tensorflow can be installed using pip, but it is preferred to use conda as the difference in performance can be up to 8x.

This is what your manifest.json would look like adding the anaconda channel and adding tensorflow-gpu==2.4.1 as a conda dependency. If you had tried uploading this package, you would find that among the requirements of tensorflow-gpu is the Cuda Toolkits, defaulting to Cuda 10.1. To ensure that your package reliably gets the same package, the version of Cuda toolkits has been explicitly specified (“pinned”). The dependency that requires a specific version of Cuda is the cudnn library, for which we can specify the version and build, which in this case would be cudnn==7.6.5. After making these changes, your manifest.json should look like the following:

{
    "name": "My Tensorflow Tutorial",
    "python_version": 3.9,
    "requirements": "orion-requirements.txt",
    "version": "0.1.0",
    "conda_channels": [
        "anaconda"
    ],
    "conda_dependencies": [
        "tensorflow-gpu==2.4.1",
        "cudatoolkit==10.1.243",
        "cudnn==7.6.5"
    ],
    "base_os": "amazonlinux2",
    "build_requirements": {}
}

At this point you should have a package that you can upload using the following command:

Note

The following command will write out a tarball to the dist/ directory within your package, which you should upload either via the UI or the orion client command line.

$ python setup.py package
# Upload via the CLI
$ ocli packages upload dist/my-tensorflow-package-0.1.0.tar.gz

Writing a Training Workfloe

Now that we have configured a package that has all of the necessary requirements, we can build a workflow that will perform training on data. In this example, we are using the the MNIST dataset for simplicity, as it is built into Tensorflow.

To begin, we will write a cube that will train a model based on chunks of the MNIST dataset training data. In this scenario, we are using shards as they reduce data transfer between cubes, and as you train on more data, this becomes invaluable.

We can first delete the existing cubes, floes, and tests, as we will be writing new ones. Delete my_tensorflow_package/mycube.py, floes/myfloe.py, tests/test_mycube.py, and tests/floe_tests/test_myfloe.py, then remove the import line in my_tensorflow_package/__init__.py.

Next, create a file training.py in the my_tensorflow_package directory, where we will start a SinkCube that will train a model, then save the model to either a local file or an Orion File.

import pickle

from floe.api import SinkCube
from floe.api.orion import in_orion
from floe.api.parameters import IntegerParameter

from orionclient.utils import TemporaryPath

from orionplatform.ports import ShardInputPort
from orionplatform.parameters import FileOutputParameter
from orionplatform.constants import FORMAT

from orionclient.types import File
from orionclient.session import APISession

import tensorflow as tf


class TensorFlowTrainingCube(SinkCube):
    title = "Tensorflow Model Trainer"
    classification = [["Examples", "Tensorflow"]]
    tags = ["Example", "Tensorflow"]
    description = "A cube that trains a tensorflow model and writes it to a file"

    # Will be handling shards
    intake = ShardInputPort(required=True)

    data_out = FileOutputParameter(required=True)

    epochs = IntegerParameter(
        default=3,
        min_value=1,
        max_value=100,
        description="Number of epoches, larger number of epoches will take longer",
    )

    # Ensure that the cube requires a GPU and cuda 9
    parameter_overrides = {
        "gpu_count": {"default": 1},
    }

    def begin(self):
        # Using model provided in tutorial here: https://www.tensorflow.org/tutorials
        self.model = tf.keras.models.Sequential(
            [
                tf.keras.layers.Flatten(input_shape=(28, 28)),
                tf.keras.layers.Dense(512, activation=tf.nn.relu),
                tf.keras.layers.Dropout(0.2),
                tf.keras.layers.Dense(10, activation=tf.nn.softmax),
            ]
        )
        # Compile the model to begin training
        self.model.compile(
            optimizer="adam",
            loss="sparse_categorical_crossentropy",
            metrics=["accuracy"],
        )

    # Sink cubes have a write method instead of a process method
    def write(self, shard, port):
        # Parse the data out of the shard
        input_data, expected_results = self.parse_shard(shard)
        for x in range(self.args.epochs):
            print("Epoch {}".format(x))
            # Train the model on the batch
            self.model.train_on_batch(input_data, expected_results)

    def parse_shard(self, shard):
        # Create a temporary file to avoid using up disk space over
        # large number of items
        with TemporaryPath(suffix=".temp") as temp:
            shard.download_to_file(temp)
            shard_format = shard.metadata.get(FORMAT, "")
            # In our toy example, the format will be pickle files
            if shard_format == "pickle":
                with open(temp, "rb") as ifs:
                    input_data, expected_results = pickle.load(ifs)
            else:
                raise NotImplementedError(
                    "Unable to handle alternative format {}".format(shard_format)
                )
        return input_data, expected_results

    def end(self):
        if in_orion():
            # If we are running in in Orion, upload it as an Orion File
            with TemporaryPath(suffix=".h5") as temp:
                self.model.save(temp)
                File.upload(APISession, self.args.data_out, temp)
        else:
            self.model.save(self.args.data_out)

This cube will take shards with metadata indicating that the format is a pickled Tensorflow batch object, train a model on the data in each shard, and save the model to a file. In a more complex case, it may be important to handle shards of different formats.

At this point, we can create a floe that will perform the training by reading shards. Using the :py:class`~orionplatform.cubes.collections.ShardReaderCube` provided by Orion Platform as our reader, we can connect it up to the training cube we just defined.

Create a training_floe.py file in the floes/ directory. If you have defined a floe before, there is nothing special about how we define the floe.

from floe.api import WorkFloe
from my_tensorflow_package.training import TensorFlowTrainingCube
from orionplatform.cubes.collections import ShardReaderCube


# Declare and document floe
job = WorkFloe("Train Tensorflow Model", title="Train Model")
job.description = (
    "Trains a tensorflow model and outputs a model in the form of an Orion File"
)
job.classification = [["Examples", "Tensorflow"]]
job.tags = ["Examples", "Tensorflow"]

# Declare Cubes
reader_cube = ShardReaderCube()
training_cube = TensorFlowTrainingCube()

# Add cubes to floe
job.add_cubes(reader_cube, training_cube)

# Promote parameters
training_cube.promote_parameter(
    "data_out", promoted_name="out", title="Output File of Tensorflow model"
)
reader_cube.promote_parameter(
    "collection", title="Collection containing shards with training data"
)

reader_cube.success.connect(training_cube.intake)

if __name__ == "__main__":
    job.run()

Finally, add a new import line to the module __init__.py.

from .training import *

__version__ = "0.1.0"

At this point, we have enough to run a floe that will perform training of a Tensorflow model, so it would be useful to test that our logic works. In this example, we are going to skip testing the cube we wrote, though it is suggested to read the documentation on cube unit testing. Instead of testing the cube, we will write tests that verify that our floe works as expected.

Testing the Training Workfloe

We can use Artemis to write tests for the training Workfloe. In tests/, create a file named test_floes.py. We will put tests for both floes into the same file, so they can share the package used for testing. You could separate the tests into different files, but this would also require duplicating the Artemis package code.

At this point, we need to add a function to the test file that will generate the data that we want to test against. In this case, we will simply extract the MNIST dataset and convert the training data into a collection of shards that will be passed to the ShardReaderCube. The following example shows what this function might look like.

def generate_collection_for_training(collection_name):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    x_train = x_train / 255.0
    collection = ShardCollection.create(APISession, collection_name)
    step = 1000
    # For each chunk of data, pickle it up. Not a good way to chunk up data,
    # but serves as an example of how to upload the training data into shards
    for x in range(0, len(x_train), step):
        with TemporaryPath(suffix="pickle") as temp:
            with open(temp, "wb") as ofs:
                pickle.dump((x_train[x : x + step], y_train[x : x + step]), ofs)
            # Create a shard with metadata that indicates that the format is a pickled object
            shard = Shard.create(collection, metadata={FORMAT: "pickle"})
            # Upload the file
            shard.upload_file(temp)
            # Close the shard to indicate that it is valid
            shard.close()
    # Close the collection to indicate to indicate collection is not being
    # modified any longer
    collection.close()
    return collection

Next we can define the test using Artemis syntax. In our case the test is simple, generate the collection as shown above, pass it as input to our floe, verify the output file was a valid model, and then delete the collection You can see the complete file you will have.

import os
import json
import pickle
import pytest
from subprocess import check_output

from artemis.wrappers import WorkFloeWrapper, OutputFileWrapper, FileWrapper, CollectionWrapper, using_orion
from artemis.test import FloeTestCase
from artemis.decorators import package
from artemis.packaging import OrionTestPackage

from orionclient.utils import TemporaryPath

from orionclient.session import APISession
from orionclient.types import ShardCollection, Shard, File

from orionplatform.constants import FORMAT

import tensorflow as tf

import my_tensorflow_package

PACKAGE_DIR = os.path.dirname(os.path.dirname(my_tensorflow_package.__file__))
FILE_DIR = os.path.join(PACKAGE_DIR, "tests", "test_data")
FLOES_DIR = os.path.join(PACKAGE_DIR, "floes")


temp_req = TemporaryPath(suffix=".txt")
results = check_output(["python", "setup.py", "--requires"], cwd=PACKAGE_DIR)
requirements = json.loads(results)
with open(temp_req.path, "w") as ofs:
    for result in requirements:
        # Create a file with orion requirements
        ofs.write("{}\n".format(result))

test_package = OrionTestPackage(manifest=dict(requirements="requirements.txt"))
# Add the contents of the regular package
test_package.add_directory(PACKAGE_DIR)
# Remove the tests as they have different requirements
test_package.remove_directory("tests/")
# Add the orion requirements requirements
test_package.add_file(temp_req.path, dest="requirements.txt")

def generate_collection_for_training(collection_name):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    x_train = x_train / 255.0
    collection = ShardCollection.create(APISession, collection_name)
    step = 1000
    # For each chunk of data, pickle it up. Not a good way to chunk up data,
    # but serves as an example of how to upload the training data into shards
    for x in range(0, len(x_train), step):
        with TemporaryPath(suffix="pickle") as temp:
            with open(temp, "wb") as ofs:
                pickle.dump((x_train[x : x + step], y_train[x : x + step]), ofs)
            # Create a shard with metadata that indicates that the format is a pickled object
            shard = Shard.create(collection, metadata={FORMAT: "pickle"})
            # Upload the file
            shard.upload_file(temp)
            # Close the shard to indicate that it is valid
            shard.close()
    # Close the collection to indicate to indicate collection is not being
    # modified any longer
    collection.close()
    return collection


@pytest.mark.floetest
@package(test_package)
class TestTensorflowFloes(FloeTestCase):
    def test_training_floe(self):
        # Use CollectionWrapper and WorkFloeWrapper to automatically trigger cleanup
        collection = generate_collection_for_training("training-collection")
        collection_wrapper = CollectionWrapper.from_collection(collection)
        workfloe = WorkFloeWrapper.get_workfloe(
            os.path.join(FLOES_DIR, "training_floe.py"), run_timeout=2400
        )
        workfloe.start(
            {
                "promoted": {
                    "out": output_file.identifier,
                    "collection": collection_wrapper.identifier,
                }
            }
        )
        self.assertWorkFloeComplete(workfloe)
        # Will download the file to a temporary file. by calling path if
        # it is not local
        result = os.stat(output_file.path)
        self.assertTrue(result.st_size > 0)

To run the test locally:

$ pytest -k test_training_floe

To run the test against Orion:

$ pytest --orion -k test_training_floe

Note

Testing against Orion can take a while, as it has to build the environment. This can be time-consuming when using GPU dependent packages.

Note

If you get a ModuleNotFoundError, you may need to save your source code directory to the $PYTHONPATH environment variable.

Writing a Prediction Workfloe

At this point, we can train a Tensorflow model using shards that contain training data and output into a file that can be used for prediction in another cube.

To do this, we will create a parallel cube that will load up the model and handle shards that contain data to predict values for. Define a parallel cube using ParallelMixin and ComputeCube. Then we will add a ShardInputPort and a JsonOutputPort. In a more realistic scenario, the JsonOutputPort would likely be replaced with another port that would allow for output suited to your use case. Lastly we will need a FileInputParameter to load the model that the previous floe created.

Create a file my_tensorflow_package/prediction.py and add the line from .prediction import * in __init__.py.

import pickle

from floe.api import ComputeCube, ParallelMixin
from floe.api.ports import JsonOutputPort, JsonInputPort

from orionclient.utils import TemporaryPath

from orionplatform.ports import ShardInputPort
from orionplatform.parameters import FileInputParameter
from orionplatform.constants import FORMAT

import tensorflow as tf


class ParallelTensorPredictionCube(ParallelMixin, ComputeCube):
    title = "Tensorflow Model Predictor"
    classification = [["Examples", "Tensorflow"]]
    tags = ["Example", "Tensorflow"]
    description = """A cube that handles shards and generates predictions
    based on the data that is pickled within the shards"""

    intake = ShardInputPort()

    success = JsonOutputPort()

    model = FileInputParameter(required=True)

    # Ensure we have a gpu for the prediction
    # Shards generally do best with an item_count of 1
    parameter_overrides = {
        "gpu_count": {"default": 1},
        "item_count": {"default": 1},
    }

    def begin(self):
        files = list(self.args.model)
        if len(files) > 1:
            self.log.info("Dropping extra files, using first")
        model_file = files[0]
        self.log.info("Using model {} for evaluation".format(model_file.name))
        self.model_object = TemporaryPath(suffix=".h5")
        # Copy the file to the temporary path, in orion this will download the
        # file to the local disk
        model_file.copy_to(self.model_object.path)
        self.tf_model = tf.keras.models.load_model(self.model_object.path)

    def process(self, shard, port):
        input_data = self.parse_shard(shard)
        predictions = self.tf_model.predict(input_data)
        self.success.emit({"output": predictions.tolist(), "input": shard.json()})

    def parse_shard(self, shard):
        # Create a temporary file to avoid using up disk space over
        # large number of work items
        with TemporaryPath(suffix=".temp") as temp:
            shard.download_to_file(temp)
            shard_format = shard.metadata.get(FORMAT, "")
            # In our toy example, the format will be pickle files
            if shard_format == "pickle":
                with open(temp, "rb") as ifs:
                    input_data = pickle.load(ifs)
            else:
                raise NotImplementedError(
                    "Unable to handle alternative format {}".format(shard_format)
                )
        return input_data

As this tutorial covers a basic example of Tensorflow, we will present the data just by printing out the predictions. In a more complete example, it may be valuable to create a floe report with the images. For this example, we can create a very simple cube that prints the prediction JSON. You can add this code directly after the prediction cube we just defined. It is set up to fail if no messages are received, in order to improve the ability to test the floe.

class PredictionPrinter(ComputeCube):
    title = "Prediction Printer"
    classification = [["Examples", "Tensorflow"]]
    tags = ["Example", "Tensorflow"]
    description = "Prints out the predictions from a trained model"

    intake = JsonInputPort()

    def begin(self):
        # Set a flag to ensure we have seen a prediction
        self.seen = False

    def process(self, data, port):
        print(data)
        # Set seen to true if printed a prediction
        self.seen = True

    def end(self):
        # Assert that cube saw an item of work
        assert self.seen

With the prediction cube written, we need to write the prediction floe which will be quite similar to the training floe, with the addition of the cube that prints out the predictions. Create a file prediction_floe.py in the floes/ directory and insert the following code.

from floe.api import WorkFloe
from my_tensorflow_package.prediction import (
    ParallelTensorPredictionCube,
    PredictionPrinter,
)
from orionplatform.cubes.collections import ShardReaderCube


# Declare and document floe
job = WorkFloe("Predict from Model", title="Predict from Model")
job.description = "Uses a trained model to evaluate some data"
job.classification = [["Examples", "Tensorflow"]]
job.tags = ["Examples", "Tensorflow"]

# Declare Cubes
reader_cube = ShardReaderCube()
prediction_cube = ParallelTensorPredictionCube()
printer = PredictionPrinter()


# Add cubes to floe
job.add_cubes(reader_cube, prediction_cube, printer)

# Promote parameters
reader_cube.promote_parameter(
    "collection", title="Collection with shards containing data suitable to the model"
)
prediction_cube.promote_parameter(
    "model",
    promoted_name="model",
    title="Input file that contains tensorflow keras model",
)

reader_cube.success.connect(prediction_cube.intake)
prediction_cube.success.connect(printer.intake)

if __name__ == "__main__":
    job.run()

Testing the Prediction Workfloe

To finish off, we will write a test to verify that predictions behave as we expected. Like the last test, we need to generate a collection that we will test against. We will again use the MNIST dataset for the data, but this time we will use the evaluation data as the input rather than the training data.

We will add the following function to the same test file as previously, tests/test_floes.py.

def generate_collection_for_prediction(collection_name):
    collection = ShardCollection.create(APISession, collection_name)
    _, (x_data, _) = tf.keras.datasets.mnist.load_data()
    x_data = x_data / 255.0
    step = 1000
    # For each chunk of data, pickle it up. Not a good way to chunk up data,
    # but serves as an example of how to upload the training data into shards
    for x in range(0, len(x_data), step):
        with TemporaryPath(suffix="pickle") as temp:
            with open(temp, "wb") as ofs:
                pickle.dump(x_data[x : x + step], ofs)
            # Create a shard with metadata that indicates that the format is a pickled object
            shard = Shard.create(collection, metadata={FORMAT: "pickle"})
            # Upload the file
            shard.upload_file(temp)
            # Close the shard to indicate that it is valid
            shard.close()
    # Close the collection to indicate to indicate collection is not being
    # modified any longer
    collection.close()
    return collection

The definition of the prediction workfloe is very similar to the previous workfloe test that we wrote, but with slightly different parameters and assertions. We can add it to the TestTensorflowFloes as test_prediction_floe(). Since we can’t test prediction without a trained model, we will refactor the tests to first run the training floe, then use the generated model to subsequently run the prediction floe.

@pytest.mark.floetest
@package(test_package)
class TestTensorflowFloes(FloeTestCase):

    def test_floes(self):
        # alternatively, the trained model could be saved in test_data as part of the package
        model = OutputFileWrapper(extension=".h5")
        self._test_training_floe(model)
        self._test_prediction_floe(model)

    def _test_training_floe(self, output_file):
        # Use CollectionWrapper and WorkFloeWrapper to automatically trigger cleanup
        collection = generate_collection_for_training("training-collection")
        collection_wrapper = CollectionWrapper.from_collection(collection)
        workfloe = WorkFloeWrapper.get_workfloe(
            os.path.join(FLOES_DIR, "training_floe.py"), run_timeout=2400
        )
        workfloe.start(
            {
                "promoted": {
                    "out": output_file.identifier,
                    "collection": collection_wrapper.identifier,
                }
            }
        )
        self.assertWorkFloeComplete(workfloe)
        # Will download the file to a temporary file. by calling path if
        # it is not local
        result = os.stat(output_file.path)
        self.assertTrue(result.st_size > 0)

    def _test_prediction_floe(self, input_file):
        model_identifier = input_file.identifier
        if using_orion:
            files = APISession.list_resources(File, filters={"name": input_file.identifier})
            for file in files:
                model_identifier = file.id
                break
        collection = generate_collection_for_prediction("prediction-floe-collection")
        collection_wrapper = CollectionWrapper.from_collection(collection)
        workfloe = WorkFloeWrapper.get_workfloe(
            os.path.join(FLOES_DIR, "prediction_floe.py"), run_timeout=60*60*2
        )
        workfloe.start(
            {
                "promoted": {
                    "model": model_identifier,
                    "collection": collection_wrapper.identifier,
                }
            }
        )
        self.assertWorkFloeComplete(workfloe)

And there we have it. We have written a training and prediction workfloe for Tensorflow using the Orion Platform, from the creation of a package, to writing cubes and floes, and finally writing tests to verify the logic that we have implemented.

To run all tests locally:

$ export PYTHONPATH=$(pwd)
$ pytest

To run all tests against Orion (because of the GPU requirement, this can be slow):

$ export PYTHONPATH=$(pwd)
$ pytest --orion