Tutorial: Creating a Tensorflow package

This tutorial will walk you through creating a complex Orion package with Conda dependencies and GPU requirements.

Creating the initial package

To create a package, follow the instructions specified here: Creating a New Orion Package. This tutorial assumes that you used Cookiecutter, however if you created your own package the information should transfer relatively easily.

Configuring Conda Requirements

The configuration of Orion packages, including conda dependencies, is controlled through a manifest.json file that lives at the top level of the directory. In this tutorial our package requires tensorflow-gpu==1.15.0 from the anaconda channel.

Note

Tensorflow can be installed using pip, but it is preferred to use conda as the difference in performance can be up to 8x.

This is what your manifest.json would look like adding the anaconda channel and adding tensorflow-gpu==1.15.0 as a conda dependency. Note that we also add in the cuda toolkits.

{
    "name": "My Tensorflow package",
    "version": "1.0.0",
    "python_version": "3.7.3",
    "conda_channels": [
      "anaconda"
    ],
    "conda_dependencies": [
      "tensorflow-gpu==1.15.0",
      "cudatoolkit==10.0.130"
    ],
    "requirements": "orion-requirements.txt"
}

If you had tried uploading this package, you would find that among the requirements of tensorflow-gpu is the Cuda Toolkits, defaulting to Cuda 10.1. To ensure that the package reliables gets the same package, the version of cuda toolkits has been explicitly choosen. The dependency that requires a specific version of Cuda is the cudnn library, which we can specify the version and build for, which in this case would be cudnn==7.6.5. After making these changes your manifest.json should look like the following:

{
  "name": "My Tensorflow package",
  "version": "0.1.0",
  "python_version": "3.7.3",
  "conda_channels": [
    "anaconda"
  ],
  "conda_dependencies": [
    "tensorflow-gpu==1.15.0",
    "cudatoolkit==10.0.130",
    "cudnn==7.6.5"
  ],
  "requirements": "orion-requirements.txt"
}

At this point you should have a package that you can upload using the following command:

Note

The following command will write out a tarball to the dist/ directory within your package, which you should upload either via the UI or the orion client command line.

$ invoke package
# Upload via the CLI
$ ocli packages upload dist/my-tensorflow-package-0.1.0.tar.gz

Writing a Training Workfloe

Now that we have configured a package that has all of the necessary requirements, we can build a Workfloe that will perform training on data. In this example we are using the the mnist dataset for simplicity as it is built into Tensorflow.

To begin we will write a cube that will train a model based on chunks of the mnist dataset training data. In this scenario we are using shards as it reduces data transfer between cubes, and as you train on more data, this becomes invaluable.

We can delete the mycube.py, as we will be writing new cubes. Create a file training.py in the directory that mycube.py was in and we will start a SinkCube that will train a model and then save the model to either a local file or an Orion File.

import pickle

from floe.api import SinkCube
from floe.api.orion import in_orion

from datarecord.utils import TemporaryPath

from orionplatform.ports import ShardInputPort
from orionplatform.parameters import FileOutputParameter
from orionplatform.constants import FORMAT

from orionclient.types import File
from orionclient.session import APISession

import tensorflow as tf


class TensorFlowTrainingCube(SinkCube):
    title = "Tensorflow Model Trainer"
    classification = [["Examples", "Tensorflow"]]
    tags = ["Example", "Tensorflow"]
    description = "A cube that trains a tensorflow model and writes it to a file"

    # Will be handling shards
    intake = ShardInputPort(required=True)

    data_out = FileOutputParameter(required=True)

    epochs = IntegerParameter(
        default=3,
        min_value=1,
        max_value=100,
        description="Number of epoches, larger number of epoches will take longer",
    )

    # Ensure that the cube requires a GPU and cuda 9
    parameter_overrides = {
        "gpu_count": {"default": 1},
    }

    def begin(self):
        # Using model provided in tutorial here: https://www.tensorflow.org/tutorials
        self.model = tf.keras.models.Sequential(
            [
                tf.keras.layers.Flatten(input_shape=(28, 28)),
                tf.keras.layers.Dense(512, activation=tf.nn.relu),
                tf.keras.layers.Dropout(0.2),
                tf.keras.layers.Dense(10, activation=tf.nn.softmax),
            ]
        )
        # Compile the model to begin training
        self.model.compile(
            optimizer="adam",
            loss="sparse_categorical_crossentropy",
            metrics=["accuracy"],
        )

    # Sink cubes have a write method instead of a process method
    def write(self, shard, port):
        # Parse the data out of the shard
        input_data, expected_results = self.parse_shard(shard)
        for x in range(self.args.epochs):
            print("Epoch {}".format(x))
            # Train the model on the batch
            self.model.train_on_batch(input_data, expected_results)

    def parse_shard(self, shard):
        # Create a temporary file to avoid using up disk space over
        # large number of items
        with TemporaryPath(suffix=".temp") as temp:
            shard.download_to_file(temp)
            shard_format = shard.metadata.get(FORMAT, "")
            # In our toy example, the format will be pickle files
            if shard_format == "pickle":
                with open(temp, "rb") as ifs:
                    input_data, expected_results = pickle.load(ifs)
            else:
                raise NotImplementedError(
                    "Unable to handle alternative format {}".format(shard_format)
                )
        return input_data, expected_results

    def end(self):
        if in_orion():
            # If we are running in in Orion, upload it as an Orion File
            with TemporaryPath(suffix=".h5") as temp:
                self.model.save(temp)
                File.upload(APISession, self.args.data_out, temp)
        else:
            self.model.save(self.args.data_out)

This cube will take shards with metadata indicating that the format is a pickled Tensorflow batch object, train a model on the data in each shard, and save the model to a file. In a more complex case, it may be important to handle shards of different formats.

At this point we can create a floe that will perform the training by reading shards. Using the :py:class`~orionplatform.cubes.collections.ShardReaderCube` provided by Orion Platform as our reader, we can connect it up to the training cube we just defined.

Create a training_floe.py file in the floes/ directory. If you have defined a floe before, there is nothing special about how we define the floe.

from floe.api import WorkFloe
from my_tensorflow_package.training import TensorFlowTrainingCube
from orionplatform.cubes.collections import ShardReaderCube


# Declare and document floe
job = WorkFloe("Train Tensorflow Model", title="Train Model")
job.description = (
    "Trains a tensorflow model and outputs a model in the form of an Orion File"
)
job.classification = [["Examples", "Tensorflow"]]
job.tags = ["Examples", "Tensorflow"]

# Declare Cubes
reader_cube = ShardReaderCube()
training_cube = TensorFlowTrainingCube()

# Add cubes to floe
job.add_cubes(reader_cube, training_cube)

# Promote parameters
training_cube.promote_parameter(
    "data_out", promoted_name="out", title="Output File of Tensorflow model"
)
reader_cube.promote_parameter(
    "collection", title="Collection containing shards with training data"
)

reader_cube.success.connect(training_cube.intake)

if __name__ == "__main__":
    job.run()

At this point we have enough to run a floe that will perform training of a Tensorflow model. At this point it is useful to test that our logic works. In this example we are going to skip testing the cube we wrote, though it is suggested to go read the docs on cube testing. Instead we will write tests that verify that our floe works as expected.

Testing the Training Workfloe

We can use Artemis to write tests for the training Workfloe. In tests/ rename the test_myfloe.py to test_floes.py.We will put tests for both floes into the same file so they can share the package used for testing. You could separate the tests into different files, but you would need to duplicate the Artemis package code.

At this point we need to add a function to the test file that will generate the data that we want to test against. In this case we will simply extract the mnist dataset and convert the training data into a collection of shards that will be passed to the ShardReaderCube. The following example shows what this function might look like.

def generate_collection_for_training(collection_name):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    x_train = x_train / 255.0
    collection = ShardCollection.create(APISession, collection_name)
    step = 1000
    # For each chunk of data, pickle it up. Not a good way to chunk up data,
    # but serves as an example of how to upload the training data into shards
    for x in range(0, len(x_train), step):
        with TemporaryPath(suffix="pickle") as temp:
            with open(temp, "wb") as ofs:
                pickle.dump((x_train[x : x + step], y_train[x : x + step]), ofs)
            # Create a shard with metadata that indicates that the format is a pickled object
            shard = Shard.create(collection, metadata={FORMAT: "pickle"})
            # Upload the file
            shard.upload_file(temp)
            # Close the shard to indicate that it is valid
            shard.close()
    # Close the collection to indicate to indicate collection is not being
    # modified any longer
    collection.close()
    return collection

Next we can define the test using Artemis syntax. In our case the test is simple, generate the collection as shown above, pass it as input to our floe, verify the output file was a valid model, and then delete the collection You can see the complete file you will end up before.

import os
import json
import pickle
import pytest
from subprocess import check_output

from artemis.wrappers import WorkFloeWrapper, OutputFileWrapper, FileWrapper, CollectionWrapper
from artemis.test import FloeTestCase
from artemis.decorators import package
from artemis.packaging import OrionTestPackage

from datarecord.utils import TemporaryPath

from orionclient.session import APISession
from orionclient.types import ShardCollection, Shard

from orionplatform.constants import FORMAT

import tensorflow as tf

import my_tensorflow_package

PACKAGE_DIR = os.path.dirname(os.path.dirname(my_tensorflow_package.__file__))

FILE_DIR = os.path.join(PACKAGE_DIR, "tests", "test_data")
FLOES_DIR = os.path.join(PACKAGE_DIR, "floes")


temp_req = TemporaryPath(suffix=".txt")
results = check_output(["python", "setup.py", "--requires"], cwd=PACKAGE_DIR)
requirements = json.loads(results)
with open(temp_req.path, "w") as ofs:
    for result in requirements:
        # Create a file with orion requirements
        ofs.write("{}\n".format(result))

test_package = OrionTestPackage(manifest=dict(requirements="requirements.txt"))
# Add the contents of the regular package
test_package.add_directory(PACKAGE_DIR)
# Remove the tests as they have different requirements
test_package.remove_directory("tests/")
# Remove tasks.py as it requires invoke
test_package.remove_file("tasks.py")
# Add the orion requirements requirements
test_package.add_file(temp_req.path, dest="requirements.txt")

def generate_collection_for_training(collection_name):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    x_train = x_train / 255.0
    collection = ShardCollection.create(APISession, collection_name)
    step = 1000
    # For each chunk of data, pickle it up. Not a good way to chunk up data,
    # but serves as an example of how to upload the training data into shards
    for x in range(0, len(x_train), step):
        with TemporaryPath(suffix="pickle") as temp:
            with open(temp, "wb") as ofs:
                pickle.dump((x_train[x : x + step], y_train[x : x + step]), ofs)
            # Create a shard with metadata that indicates that the format is a pickled object
            shard = Shard.create(collection, metadata={FORMAT: "pickle"})
            # Upload the file
            shard.upload_file(temp)
            # Close the shard to indicate that it is valid
            shard.close()
    # Close the collection to indicate to indicate collection is not being
    # modified any longer
    collection.close()
    return collection


@pytest.mark.floetest
@package(test_package)
class TestTensorflowFloes(FloeTestCase):
    def test_training_floe(self):
        collection = generate_collection_for_training("training-collection")
        # Add to the wrapper, to automatically trigger cleanup
        collection_wrapper = CollectionWrapper.from_collection(collection)
        workfloe = WorkFloeWrapper.get_workfloe(
            os.path.join(FLOES_DIR, "training_floe.py"), run_timeout=2400
        )
        # This will represent the output file that is the model
        output_file = OutputFileWrapper(extension=".h5")
        workfloe.start(
            {
                "promoted": {
                    "out": output_file.identifier,
                    "collection": collection_wrapper.identifier,
                }
            }
        )
        self.assertWorkFloeComplete(workfloe)
        # Will download the file to a temporary file. by calling path if
        # it is not local
        result = os.stat(output_file.path)
        self.assertTrue(result.st_size > 0)
        tf.keras.models.load_model(output_file.path)

To run the test locally:

$ pytest -k test_training_floe

To run the test against Orion:

$ pytest --orion -k test_training_floe

Note

Testing against Orion can take awhile, as it has to build the environment which can be time consuming when using GPU dependent packages.

Writing a Prediction Workfloe

At this point we can train a Tensorflow model using shards that contain training data and output it into a file that can be used for prediction in another cube.

To do this we will create a parallel cube that will load up the model and handle shards that contain data to predict values for. Define a parallel cube using ParallelMixin and ComputeCube. Then we will add a ShardInputPort and a JsonOutputPort. In a more realistic scenario, the JsonOutputPort would likely be replaced with another port that would allow for output suited to your use case. Lastly we will need a FileInputParameter to load the model that the previous floe created.

import pickle

from floe.api import ComputeCube, ParallelMixin
from floe.api.ports import JsonOutputPort

from datarecord.utils import TemporaryPath

from orionplatform.ports import ShardInputPort
from orionplatform.parameters import FileInputParameter
from orionplatform.constants import FORMAT

import tensorflow as tf


class ParallelTensorPredicationCube(ParallelMixin, ComputeCube):
    title = "Tensorflow Model Predictor"
    classification = [["Examples", "Tensorflow"]]
    tags = ["Example", "Tensorflow"]
    description = """A cube that handles shards and generates predictions
    based on the data that is pickled within the shards"""

    intake = ShardInputPort()

    success = JsonOutputPort()

    model = FileInputParameter(required=True)

    # Ensure we have a gpu for the prediction
    # Shards generally do best with an item_count of 1
    parameter_overrides = {
        "gpu_count": {"default": 1},
        "item_count": {"default": 1},
    }

    def begin(self):
        files = list(self.args.model)
        if len(files) > 1:
            self.log.info("Dropping extra files, using first")
        model_file = files[0]
        self.log.info("Using model {} for evaluation".format(model_file.name))
        self.model_object = TemporaryPath(suffix=".h5")
        # Copy the file to the temporary path, in orion this will download the
        # file to the local disk
        model_file.copy_to(self.model_object.path)
        self.tf_model = tf.keras.models.load_model(self.model_object.path)

    def process(self, shard, port):
        input_data = self.parse_shard(shard)
        predictions = self.tf_model.predict(input_data)
        self.success.emit({"output": predictions.tolist(), "input": shard.json()})

    def parse_shard(self, shard):
        # Create a temporary file to avoid using up disk space over
        # large number of work items
        with TemporaryPath(suffix=".temp") as temp:
            shard.download_to_file(temp)
            shard_format = shard.metadata.get(FORMAT, "")
            # In our toy example, the format will be pickle files
            if shard_format == "pickle":
                with open(temp, "rb") as ifs:
                    input_data = pickle.load(ifs)
            else:
                raise NotImplementedError(
                    "Unable to handle alternative format {}".format(shard_format)
                )
        return input_data

As this tutorial covers a basic example of Tensorflow, we will present the data just by printing out the predictions. In a more complete example, it may be valuable to create a floe report with the images. For this example we can create a very simple cube that prints the prediction JSON. You can add this code directly after the prediction cube we just defined. It is setup to fail if no messages are received, to improve the ability to test the floe.

class PredictionPrinter(ComputeCube):
    title = "Prediction Printer"
    classification = [["Examples", "Tensorflow"]]
    tags = ["Example", "Tensorflow"]
    description = "Prints out the predictions from a trained model"

    intake = JsonInputPort()

    def begin(self):
        # Set a flag to ensure we have seen a prediction
        self.seen = False

    def process(self, data, port):
        print(data)
        # Set seen to true if printed a prediction
        self.seen = True

    def end(self):
        # Assert that cube saw an item of work
        assert self.seen

With the prediction cube written, we need to write the prediction floe which will be quite similar to the training floe, with the addition of the cube that prints out the predictions. Create a file prediction_floe.py in the floes/ directory and insert the following code.

from floe.api import WorkFloe
from my_tensorflow_package.prediction import (
    ParallelTensorPredicationCube,
    PredictionPrinter,
)
from orionplatform.cubes.collections import ShardReaderCube


# Declare and document floe
job = WorkFloe("Predict from Model", title="Predict from Model")
job.description = "Uses a trained model to evaluate some data"
job.classification = [["Examples", "Tensorflow"]]
job.tags = ["Examples", "Tensorflow"]

# Declare Cubes
reader_cube = ShardReaderCube()
prediction_cube = ParallelTensorPredicationCube()
printer = PredictionPrinter()


# Add cubes to floe
job.add_cubes(reader_cube, prediction_cube, printer)

# Promote parameters
reader_cube.promote_parameter(
    "collection", title="Collection with shards containing data suitable to the model"
)
prediction_cube.promote_parameter(
    "model",
    promoted_name="model",
    title="Input file that contains tensorflow keras model",
)

reader_cube.success.connect(prediction_cube.intake)
prediction_cube.success.connect(printer.intake)

if __name__ == "__main__":
    job.run()

Testing the Prediction Workfloe

To finish off, we will write a test to verify that predictions behave as we expected. Like the last test, we need to generate a collection to test against. that we will test against. We will also use the mnist dataset for the data, but this time using the evaluation data as the input rather than the training data.

We will add the following function to the same test file as previously, tests/test_floes.py.

def generate_collection_for_prediction(collection_name):
    collection = ShardCollection.create(APISession, collection_name)
    _, (x_data, _) = tf.keras.datasets.mnist.load_data()
    x_data = x_data / 255.0
    step = 1000
    # For each chunk of data, pickle it up. Not a good way to chunk up data,
    # but serves as an example of how to upload the training data into shards
    for x in range(0, len(x_data), step):
        with TemporaryPath(suffix="pickle") as temp:
            with open(temp, "wb") as ofs:
                pickle.dump(x_data[x : x + step], ofs)
            # Create a shard with metadata that indicates that the format is a pickled object
            shard = Shard.create(collection, metadata={FORMAT: "pickle"})
            # Upload the file
            shard.upload_file(temp)
            # Close the shard to indicate that it is valid
            shard.close()
    # Close the collection to indicate to indicate collection is not being
    # modified any longer
    collection.close()
    return collection

The definition of the prediction workfloe is very similar to the previous workfloe test that we wrote, but with slightly different parameters and assertions. We can add it to the TestTensorflowFloes as test_prediction_floe().

@pytest.mark.floetest
@package(test_package)
class TestTensorflowFloes(FloeTestCase):

    def test_prediction_floe(self):
        collection = generate_collection_for_prediction("prediction-floe-collection")
        # Add to the wrapper, to automatically trigger cleanup
        collection_wrapper = CollectionWrapper.from_collection(collection)
        # Get the prediction workfloe
        workfloe = WorkFloeWrapper.get_workfloe(
            os.path.join(FLOES_DIR, "prediction_floe.py"), run_timeout=2400
        )
        # This will represent the output file that is the model
        input_file = FileWrapper.get_file(
            os.path.join(FILE_DIR, "example_tf_model.h5")
        )
        workfloe.start(
            {
                "promoted": {
                    "model": input_file.identifier,
                    "collection": collection_wrapper.identifier,
                }
            }
        )
        self.assertWorkFloeComplete(workfloe)

And there we have it, we have written a training and prediction workfloe for Tensorflow using the Orion Platform from the creation of a package to writing cubes and floes and finally writing tests to verify the logic that we have implemented.

To run all tests locally:

$ pytest

To run all tests against Orion:

$ pytest --orion