Tutorial: Creating a Tensorflow Package
This tutorial will walk you through creating a complex Orion package with conda dependencies and GPU requirements.
Creating the initial package
Begin by creating a basic package using the init
command; for more details
follow the instructions here.
This tutorial assumes that you are using ocli
. However, if you already created
a package manually, the information should transfer relatively easily.
$ ocli packages init
Author's full name [John Doe]:
Email [john.doe@my_org.com]:
Project name eg. My Orion Package [My Orion Package]: My Tensorflow Tutorial
Project slug [my-orion-package]: my-tensorflow-package
Module name [my_orion_package]: my_tensorflow_package
Description [Floe package]:
Version [0.1.0]:
Configuring Conda Requirements
The configuration of Orion packages, including conda dependencies, is controlled through a manifest.json
file
that lives at the top level of the directory. In this tutorial, our package requires
tensorflow-gpu==2.4.1
from the anaconda
channel.
Note
Tensorflow can be installed using pip, but it is preferred to use conda as the difference in performance can be up to 8x.
This is what your manifest.json
would look like adding the anaconda
channel
and adding tensorflow-gpu==2.4.1
as a conda dependency.
If you had tried uploading this package, you would find that among the requirements
of tensorflow-gpu
is the Cuda Toolkits, defaulting to Cuda 10.1. To ensure that
your package reliably gets the same package, the version of Cuda toolkits has been explicitly specified (“pinned”).
The dependency that requires a specific version of Cuda is the cudnn library, for
which we can specify the version and build, which in this case would be cudnn==7.6.5
.
After making these changes, your manifest.json
should look like the following:
{
"name": "My Tensorflow Tutorial",
"python_version": 3.9,
"requirements": "orion-requirements.txt",
"version": "0.1.0",
"conda_channels": [
"anaconda"
],
"conda_dependencies": [
"tensorflow-gpu==2.4.1",
"cudatoolkit==10.1.243",
"cudnn==7.6.5"
],
"base_os": "amazonlinux2",
"build_requirements": {}
}
At this point you should have a package that you can upload using the following command:
Note
The following command will write out a tarball to the dist/
directory
within your package, which you should upload either via the UI or the
orion client command line.
$ python setup.py package
# Upload via the CLI
$ ocli packages upload dist/my-tensorflow-package-0.1.0.tar.gz
Writing a Training Workfloe
Now that we have configured a package that has all of the necessary requirements, we can build a workflow that will perform training on data. In this example, we are using the the MNIST dataset for simplicity, as it is built into Tensorflow.
To begin, we will write a cube that will train a model based on chunks of the MNIST dataset training data. In this scenario, we are using shards as they reduce data transfer between cubes, and as you train on more data, this becomes invaluable.
We can first delete the existing cubes, floes, and tests, as we will be writing new ones. Delete my_tensorflow_package/mycube.py
, floes/myfloe.py
,
tests/test_mycube.py
, and tests/floe_tests/test_myfloe.py
, then remove the import line in my_tensorflow_package/__init__.py
.
Next, create a file training.py
in the my_tensorflow_package
directory, where we will start a
SinkCube
that will train a model, then save the model to either a local file or an Orion File.
import pickle
from floe.api import SinkCube
from floe.api.orion import in_orion
from floe.api.parameters import IntegerParameter
from orionclient.utils import TemporaryPath
from orionplatform.ports import ShardInputPort
from orionplatform.parameters import FileOutputParameter
from orionplatform.constants import FORMAT
from orionclient.types import File
from orionclient.session import APISession
import tensorflow as tf
class TensorFlowTrainingCube(SinkCube):
title = "Tensorflow Model Trainer"
classification = [["Examples", "Tensorflow"]]
tags = ["Example", "Tensorflow"]
description = "A cube that trains a tensorflow model and writes it to a file"
# Will be handling shards
intake = ShardInputPort(required=True)
data_out = FileOutputParameter(required=True)
epochs = IntegerParameter(
default=3,
min_value=1,
max_value=100,
description="Number of epoches, larger number of epoches will take longer",
)
# Ensure that the cube requires a GPU and cuda 9
parameter_overrides = {
"gpu_count": {"default": 1},
}
def begin(self):
# Using model provided in tutorial here: https://www.tensorflow.org/tutorials
self.model = tf.keras.models.Sequential(
[
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(512, activation=tf.nn.relu),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation=tf.nn.softmax),
]
)
# Compile the model to begin training
self.model.compile(
optimizer="adam",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"],
)
# Sink cubes have a write method instead of a process method
def write(self, shard, port):
# Parse the data out of the shard
input_data, expected_results = self.parse_shard(shard)
for x in range(self.args.epochs):
print("Epoch {}".format(x))
# Train the model on the batch
self.model.train_on_batch(input_data, expected_results)
def parse_shard(self, shard):
# Create a temporary file to avoid using up disk space over
# large number of items
with TemporaryPath(suffix=".temp") as temp:
shard.download_to_file(temp)
shard_format = shard.metadata.get(FORMAT, "")
# In our toy example, the format will be pickle files
if shard_format == "pickle":
with open(temp, "rb") as ifs:
input_data, expected_results = pickle.load(ifs)
else:
raise NotImplementedError(
"Unable to handle alternative format {}".format(shard_format)
)
return input_data, expected_results
def end(self):
if in_orion():
# If we are running in in Orion, upload it as an Orion File
with TemporaryPath(suffix=".h5") as temp:
self.model.save(temp)
File.upload(APISession, self.args.data_out, temp)
else:
self.model.save(self.args.data_out)
This cube will take shards with metadata indicating that the format is a pickled Tensorflow batch object, train a model on the data in each shard, and save the model to a file. In a more complex case, it may be important to handle shards of different formats.
At this point, we can create a floe that will perform the training by reading shards. Using the :py:class`~orionplatform.cubes.collections.ShardReaderCube` provided by Orion Platform as our reader, we can connect it up to the training cube we just defined.
Create a training_floe.py
file in the floes/
directory. If you have defined a floe before, there is nothing
special about how we define the floe.
from floe.api import WorkFloe
from my_tensorflow_package.training import TensorFlowTrainingCube
from orionplatform.cubes.collections import ShardReaderCube
# Declare and document floe
job = WorkFloe("Train Tensorflow Model", title="Train Model")
job.description = (
"Trains a tensorflow model and outputs a model in the form of an Orion File"
)
job.classification = [["Examples", "Tensorflow"]]
job.tags = ["Examples", "Tensorflow"]
# Declare Cubes
reader_cube = ShardReaderCube()
training_cube = TensorFlowTrainingCube()
# Add cubes to floe
job.add_cubes(reader_cube, training_cube)
# Promote parameters
training_cube.promote_parameter(
"data_out", promoted_name="out", title="Output File of Tensorflow model"
)
reader_cube.promote_parameter(
"collection", title="Collection containing shards with training data"
)
reader_cube.success.connect(training_cube.intake)
if __name__ == "__main__":
job.run()
Finally, add a new import line to the module __init__.py
.
from .training import *
__version__ = "0.1.0"
At this point, we have enough to run a floe that will perform training of a Tensorflow model, so it would be useful to test that our logic works. In this example, we are going to skip testing the cube we wrote, though it is suggested to read the documentation on cube unit testing. Instead of testing the cube, we will write tests that verify that our floe works as expected.
Testing the Training Workfloe
We can use Artemis to write tests for the training Workfloe. In tests/
, create a file named test_floes.py
.
We will put tests for both floes into the same file, so they can share the package used for testing. You could
separate the tests into different files, but this would also require duplicating the Artemis package code.
At this point, we need to add a function to the test file that will generate the data that we want to test against. In this case, we will simply extract the MNIST dataset and convert the training data into a collection of shards that will be passed to the ShardReaderCube. The following example shows what this function might look like.
def generate_collection_for_training(collection_name):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train / 255.0
collection = ShardCollection.create(APISession, collection_name)
step = 1000
# For each chunk of data, pickle it up. Not a good way to chunk up data,
# but serves as an example of how to upload the training data into shards
for x in range(0, len(x_train), step):
with TemporaryPath(suffix="pickle") as temp:
with open(temp, "wb") as ofs:
pickle.dump((x_train[x : x + step], y_train[x : x + step]), ofs)
# Create a shard with metadata that indicates that the format is a pickled object
shard = Shard.create(collection, metadata={FORMAT: "pickle"})
# Upload the file
shard.upload_file(temp)
# Close the shard to indicate that it is valid
shard.close()
# Close the collection to indicate to indicate collection is not being
# modified any longer
collection.close()
return collection
Next we can define the test using Artemis syntax. In our case the test is simple, generate the collection as shown above, pass it as input to our floe, verify the output file was a valid model, and then delete the collection You can see the complete file you will have.
import os
import json
import pickle
import pytest
from subprocess import check_output
from artemis.wrappers import WorkFloeWrapper, OutputFileWrapper, FileWrapper, CollectionWrapper, using_orion
from artemis.test import FloeTestCase
from artemis.decorators import package
from artemis.packaging import OrionTestPackage
from orionclient.utils import TemporaryPath
from orionclient.session import APISession
from orionclient.types import ShardCollection, Shard, File
from orionplatform.constants import FORMAT
import tensorflow as tf
import my_tensorflow_package
PACKAGE_DIR = os.path.dirname(os.path.dirname(my_tensorflow_package.__file__))
FILE_DIR = os.path.join(PACKAGE_DIR, "tests", "test_data")
FLOES_DIR = os.path.join(PACKAGE_DIR, "floes")
temp_req = TemporaryPath(suffix=".txt")
results = check_output(["python", "setup.py", "--requires"], cwd=PACKAGE_DIR)
requirements = json.loads(results)
with open(temp_req.path, "w") as ofs:
for result in requirements:
# Create a file with orion requirements
ofs.write("{}\n".format(result))
test_package = OrionTestPackage(manifest=dict(requirements="requirements.txt"))
# Add the contents of the regular package
test_package.add_directory(PACKAGE_DIR)
# Remove the tests as they have different requirements
test_package.remove_directory("tests/")
# Add the orion requirements requirements
test_package.add_file(temp_req.path, dest="requirements.txt")
def generate_collection_for_training(collection_name):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train / 255.0
collection = ShardCollection.create(APISession, collection_name)
step = 1000
# For each chunk of data, pickle it up. Not a good way to chunk up data,
# but serves as an example of how to upload the training data into shards
for x in range(0, len(x_train), step):
with TemporaryPath(suffix="pickle") as temp:
with open(temp, "wb") as ofs:
pickle.dump((x_train[x : x + step], y_train[x : x + step]), ofs)
# Create a shard with metadata that indicates that the format is a pickled object
shard = Shard.create(collection, metadata={FORMAT: "pickle"})
# Upload the file
shard.upload_file(temp)
# Close the shard to indicate that it is valid
shard.close()
# Close the collection to indicate to indicate collection is not being
# modified any longer
collection.close()
return collection
@pytest.mark.floetest
@package(test_package)
class TestTensorflowFloes(FloeTestCase):
def test_training_floe(self):
# Use CollectionWrapper and WorkFloeWrapper to automatically trigger cleanup
collection = generate_collection_for_training("training-collection")
collection_wrapper = CollectionWrapper.from_collection(collection)
workfloe = WorkFloeWrapper.get_workfloe(
os.path.join(FLOES_DIR, "training_floe.py"), run_timeout=2400
)
workfloe.start(
{
"promoted": {
"out": output_file.identifier,
"collection": collection_wrapper.identifier,
}
}
)
self.assertWorkFloeComplete(workfloe)
# Will download the file to a temporary file. by calling path if
# it is not local
result = os.stat(output_file.path)
self.assertTrue(result.st_size > 0)
To run the test locally:
$ pytest -k test_training_floe
To run the test against Orion:
$ pytest --orion -k test_training_floe
Note
Testing against Orion can take a while, as it has to build the environment. This can be time-consuming when using GPU dependent packages.
Note
If you get a ModuleNotFoundError
, you may need to save your source code directory
to the $PYTHONPATH environment variable.
Writing a Prediction Workfloe
At this point, we can train a Tensorflow model using shards that contain training data and output into a file that can be used for prediction in another cube.
To do this, we will create a parallel cube that will load up the model and handle shards that contain
data to predict values for. Define a parallel cube using ParallelMixin
and ComputeCube
.
Then we will add a ShardInputPort
and a JsonOutputPort
. In a more realistic scenario,
the JsonOutputPort
would likely be replaced with another port that would
allow for output suited to your use case. Lastly we will need a FileInputParameter
to
load the model that the previous floe created.
Create a file my_tensorflow_package/prediction.py
and add the line from .prediction import *
in __init__.py
.
import pickle
from floe.api import ComputeCube, ParallelMixin
from floe.api.ports import JsonOutputPort, JsonInputPort
from orionclient.utils import TemporaryPath
from orionplatform.ports import ShardInputPort
from orionplatform.parameters import FileInputParameter
from orionplatform.constants import FORMAT
import tensorflow as tf
class ParallelTensorPredictionCube(ParallelMixin, ComputeCube):
title = "Tensorflow Model Predictor"
classification = [["Examples", "Tensorflow"]]
tags = ["Example", "Tensorflow"]
description = """A cube that handles shards and generates predictions
based on the data that is pickled within the shards"""
intake = ShardInputPort()
success = JsonOutputPort()
model = FileInputParameter(required=True)
# Ensure we have a gpu for the prediction
# Shards generally do best with an item_count of 1
parameter_overrides = {
"gpu_count": {"default": 1},
"item_count": {"default": 1},
}
def begin(self):
files = list(self.args.model)
if len(files) > 1:
self.log.info("Dropping extra files, using first")
model_file = files[0]
self.log.info("Using model {} for evaluation".format(model_file.name))
self.model_object = TemporaryPath(suffix=".h5")
# Copy the file to the temporary path, in orion this will download the
# file to the local disk
model_file.copy_to(self.model_object.path)
self.tf_model = tf.keras.models.load_model(self.model_object.path)
def process(self, shard, port):
input_data = self.parse_shard(shard)
predictions = self.tf_model.predict(input_data)
self.success.emit({"output": predictions.tolist(), "input": shard.json()})
def parse_shard(self, shard):
# Create a temporary file to avoid using up disk space over
# large number of work items
with TemporaryPath(suffix=".temp") as temp:
shard.download_to_file(temp)
shard_format = shard.metadata.get(FORMAT, "")
# In our toy example, the format will be pickle files
if shard_format == "pickle":
with open(temp, "rb") as ifs:
input_data = pickle.load(ifs)
else:
raise NotImplementedError(
"Unable to handle alternative format {}".format(shard_format)
)
return input_data
As this tutorial covers a basic example of Tensorflow, we will present the data just by printing out the predictions. In a more complete example, it may be valuable to create a floe report with the images. For this example, we can create a very simple cube that prints the prediction JSON. You can add this code directly after the prediction cube we just defined. It is set up to fail if no messages are received, in order to improve the ability to test the floe.
class PredictionPrinter(ComputeCube):
title = "Prediction Printer"
classification = [["Examples", "Tensorflow"]]
tags = ["Example", "Tensorflow"]
description = "Prints out the predictions from a trained model"
intake = JsonInputPort()
def begin(self):
# Set a flag to ensure we have seen a prediction
self.seen = False
def process(self, data, port):
print(data)
# Set seen to true if printed a prediction
self.seen = True
def end(self):
# Assert that cube saw an item of work
assert self.seen
With the prediction cube written, we need to write the prediction floe which will be quite similar
to the training floe, with the addition of the cube that prints out the predictions. Create a file
prediction_floe.py
in the floes/
directory and insert the following code.
from floe.api import WorkFloe
from my_tensorflow_package.prediction import (
ParallelTensorPredictionCube,
PredictionPrinter,
)
from orionplatform.cubes.collections import ShardReaderCube
# Declare and document floe
job = WorkFloe("Predict from Model", title="Predict from Model")
job.description = "Uses a trained model to evaluate some data"
job.classification = [["Examples", "Tensorflow"]]
job.tags = ["Examples", "Tensorflow"]
# Declare Cubes
reader_cube = ShardReaderCube()
prediction_cube = ParallelTensorPredictionCube()
printer = PredictionPrinter()
# Add cubes to floe
job.add_cubes(reader_cube, prediction_cube, printer)
# Promote parameters
reader_cube.promote_parameter(
"collection", title="Collection with shards containing data suitable to the model"
)
prediction_cube.promote_parameter(
"model",
promoted_name="model",
title="Input file that contains tensorflow keras model",
)
reader_cube.success.connect(prediction_cube.intake)
prediction_cube.success.connect(printer.intake)
if __name__ == "__main__":
job.run()
Testing the Prediction Workfloe
To finish off, we will write a test to verify that predictions behave as we expected. Like the last test, we need to generate a collection that we will test against. We will again use the MNIST dataset for the data, but this time we will use the evaluation data as the input rather than the training data.
We will add the following function to the same test file as previously, tests/test_floes.py
.
def generate_collection_for_prediction(collection_name):
collection = ShardCollection.create(APISession, collection_name)
_, (x_data, _) = tf.keras.datasets.mnist.load_data()
x_data = x_data / 255.0
step = 1000
# For each chunk of data, pickle it up. Not a good way to chunk up data,
# but serves as an example of how to upload the training data into shards
for x in range(0, len(x_data), step):
with TemporaryPath(suffix="pickle") as temp:
with open(temp, "wb") as ofs:
pickle.dump(x_data[x : x + step], ofs)
# Create a shard with metadata that indicates that the format is a pickled object
shard = Shard.create(collection, metadata={FORMAT: "pickle"})
# Upload the file
shard.upload_file(temp)
# Close the shard to indicate that it is valid
shard.close()
# Close the collection to indicate to indicate collection is not being
# modified any longer
collection.close()
return collection
The definition of the prediction workfloe is very similar to the previous workfloe test that we
wrote, but with slightly different parameters and assertions. We can add it to the TestTensorflowFloes
as test_prediction_floe()
. Since we can’t test prediction without a trained model, we will refactor the
tests to first run the training floe, then use the generated model to subsequently run the prediction floe.
@pytest.mark.floetest
@package(test_package)
class TestTensorflowFloes(FloeTestCase):
def test_floes(self):
# alternatively, the trained model could be saved in test_data as part of the package
model = OutputFileWrapper(extension=".h5")
self._test_training_floe(model)
self._test_prediction_floe(model)
def _test_training_floe(self, output_file):
# Use CollectionWrapper and WorkFloeWrapper to automatically trigger cleanup
collection = generate_collection_for_training("training-collection")
collection_wrapper = CollectionWrapper.from_collection(collection)
workfloe = WorkFloeWrapper.get_workfloe(
os.path.join(FLOES_DIR, "training_floe.py"), run_timeout=2400
)
workfloe.start(
{
"promoted": {
"out": output_file.identifier,
"collection": collection_wrapper.identifier,
}
}
)
self.assertWorkFloeComplete(workfloe)
# Will download the file to a temporary file. by calling path if
# it is not local
result = os.stat(output_file.path)
self.assertTrue(result.st_size > 0)
def _test_prediction_floe(self, input_file):
model_identifier = input_file.identifier
if using_orion:
files = APISession.list_resources(File, filters={"name": input_file.identifier})
for file in files:
model_identifier = file.id
break
collection = generate_collection_for_prediction("prediction-floe-collection")
collection_wrapper = CollectionWrapper.from_collection(collection)
workfloe = WorkFloeWrapper.get_workfloe(
os.path.join(FLOES_DIR, "prediction_floe.py"), run_timeout=60*60*2
)
workfloe.start(
{
"promoted": {
"model": model_identifier,
"collection": collection_wrapper.identifier,
}
}
)
self.assertWorkFloeComplete(workfloe)
And there we have it. We have written a training and prediction workfloe for Tensorflow using the Orion Platform, from the creation of a package, to writing cubes and floes, and finally writing tests to verify the logic that we have implemented.
To run all tests locally:
$ export PYTHONPATH=$(pwd)
$ pytest
To run all tests against Orion (because of the GPU requirement, this can be slow):
$ export PYTHONPATH=$(pwd)
$ pytest --orion
Download code