Shard Collections

Shard Collections, or, simply, collections, are made up of shards, which are thin wrappers around files (not related to Orion Files). Each shard can store up to 5 GB of arbitrary binary data as well as 256 kB of user defined metadata in the form of JSON.

Example Use Cases

  • Perform a virtual screen on large numbers of molecules (\(>10^6\)).

    • Accumulate data that constitutes 10 minutes to an hour’s worth of work within each shard.

    • The number of shards in the collection will determine the maximum number of parallel workers that can run at one time. It is recommended that collections contain no more than 500,000 shards.

  • Store Molecular Dynamics trajectory data.

    • Collect numerous trajectories from a job into a collection.

    • Start a new simulation from a trajectory stored in a shard.

    • Output records can reference the collection/shard ID containing trajectory data from which the record was generated.

  • Store components of a directory structure.

    • Use a tool like tar to store subdirectories.

    • Allows recreation of complete file structure by untarring all shards into same location.

    • Start operations from state contained within the directory structure.

Lifecycle

Collections and shards both have a set of states that have implications to what a developer should expect about a collection or shard at any given time. The following diagrams summarize these states.

Collections

                                      ┌────────┐
                                      │Creation│
                                      └────┬───┘
                                           │
                                           │
                                           ▼
             ┌────────┐       ┌─────────────────────────┐            ┌──────┐
             │  Upon  │       │       State: Open       │            │ Upon │
             │  open  │       │ Shards can be added and │            │close │
             └────────┘       │     deleted from a      │            └──────┘
               ┌─────────────▶│ collection. Can contain │────────────────┐
               │              │shards that are not ready│                │
               │              │     for retrieval.      │                │
               │              └─────────────────────────┘                │
               │                                                         │
               │                                                         ▼
┌─────────────────────────────┐                         ┌────────────────────────────────┐
│        State: Ready         │                         │    State: Queued/Processing    │
│Shards are immutable in this │                         │Collections are processed in the│
│state. All shards will be in │                         │   background and cannot be     │
│  the `ready` state and be   │                         │   modified while processing.   │
│   available for download.   │◀──────┬──────────┬──────│ Processing removes all shards  │
│                             │       │Processing│      │ not in the ready state and     │
│                             │       │completed │      │   calculates the total size.   │
└─────────────────────────────┘       └──────────┘      └────────────────────────────────┘

Note

Collections are an Orion-specific data structure. While it is possible to download collections with ocli (that is, with ocli collections download), this converts the collection data to a folder with files. Such a folder cannot be used as input for a floe. This is in contrast to files and datasets, which when downloaded can be used as input for floes that are run locally.

Shards

            ┌────────┐
            │Creation│
            └────────┘
                 │
                 ▼
┌────────────────────────────────┐
│          State: Open           │
│Shard has been created and ready│
│   for a file to be uploaded.   │
│                                │
│                                │
│                                │
└────────────────────────────────┘
          ┌─────┐│
          │Upon ││
          │close││
          └─────┘▼
┌────────────────────────────────┐
│          State: Ready          │
│    Indicates that the shard    │
│contains data and the associated│
│   file data is available for   │
│ retrieval. Cannot be modified  │
│           once ready.          │
│                                │
└────────────────────────────────┘

Guidelines

  • Batch work items.

    • Choose shard sizes balancing parallelism and overhead.

    • Measure memory usage and computation time.

    • Each shard is limited to 5 GB.

    • See Batching Work.

  • Close shards downstream from creation.

    • By default, newly-created shards are open.

    • Parallel cubes may retry work, which will result in multiple shards with the same output data.

    • A shard must be closed in order to be marked as good and be kept as part of the collection.

    • Only close shards in a downstream cube outside of any cube group, or you may get duplicate shards marked as good.

    • Minimize use of temporary shards, because they will create more work for CloseShardsCube, orionclient.types.ShardCollection.close(), and orionclient.types.ShardCollection.list_shards().

Warning

Shards in the open and error states will be DELETED when a collection is closed.

  • Avoid unnecessary work while streaming.

    • Shards can be downloaded as a stream, but the longer the network connection is open, the more likely an error will occur.

    • Download to local files to avoid connection resets or running out of memory.

Batching Work

When a dataset that has grown in size such that operating on individual records does not scale well (that is, \(>10^6\) records), it is time to transition to working on batches of records. In other words, it should be using orionclient.types.ShardCollection. The first step in this transition is to transform the data. For this sort of floe, you will want to use a DatasetBatcherCube to emit sets of records into a parallel cube group consisting of a ParallelDatasetBatchReaderCube and ParallelRecordsToCollectionCube.

Parallel cubes cannot accumulate data predictably, due to dynamic scaling, so they must be given all of the records up front. If they are fed individual records, then they cannot be expected to emit shards consisting of a set of records. The item_count parameter will not affect this.

Here is a simple example that reads a dataset and writes a collection. Additional record processing can be put in the parallel cube group between the ParallelDatasetBatchReaderCube and ParallelRecordsToCollectionCube.

# CONFIDENTIAL. (C) 2020 OpenEye Scientific Software Inc.
# All rights reserved.
# ALL SOFTWARE BELOW IS PROPRIETARY AND CONFIDENTIAL TO OPENEYE
# SCIENTIFIC SOFTWARE INC., AND IS SUBJECT TO THE FULL PROTECTION OF
# COPYRIGHT AND TRADESECRET LAW.
# Copying or modifying this software is strictly prohibited and illegal.
# Using this software without a valid license from OpenEye is strictly
# prohibited and illegal.  De-obfuscating, de-minimizing, or any other
# attempt to reverse engineer or discover the workings of this software,
# is strictly prohibited. Even if software is exposed or visible, it is
# still proprietary and confidential and the above prohibitions apply.
# This software is NOT "Sample Code". For purposes of customization or
# interoperation with other software you must use Sample Code
# specifically labeled as such.
# Please contact OpenEye at eyesopen.com/legal if you have any questions
# about this warning.
from floe.api import WorkFloe, ParallelCubeGroup
from orionplatform.cubes import (
    DatasetBatcherCube,
    CloseCollectionCube,
    CreateCollectionCube,
    ParallelDatasetBatchReaderCube,
    ParallelRecordsToCollectionCube,
)

job = WorkFloe(
    "Test Dataset to Record Collection",
    title="Test Dataset to Collection Export",
)
setup = CreateCollectionCube("setup")
batcher = DatasetBatcherCube("batcher")
reader = ParallelDatasetBatchReaderCube("reader")
writer = ParallelRecordsToCollectionCube("writer")
close = CloseCollectionCube("close")

setup.modify_parameter(setup.collection_name, promoted=True)
batcher.modify_parameter(batcher.data_in, promoted=True)
batcher.modify_parameter(batcher.batch_size, promoted=True)
writer.modify_parameter(writer.collection_type, promoted=True)

group = ParallelCubeGroup(cubes=[reader, writer])
job.add_group(group)

job.add_cubes(setup, batcher, reader, writer, close)

setup.success.connect(writer.init)
batcher.success.connect(reader.intake)

reader.success.connect(writer.intake)

writer.success.connect(close.intake)


if __name__ == "__main__":
    job.run()

If work cannot be batched from the beginning, then aggregate work items in serial cubes. For example, molecule database preparation and searching. Molecule preparation needs relatively small shards because the amount of computation time per shard is large. However, a FastROCS search requires larger shards because the computation time per shard is low. So, references to small shards created during preparation can be accumulated in a serial AccumulateShardsCube, which emits lists to parallel cubes so that they can operate on batches of small shards to create larger shards.

Collections in a Local Workfloe Development

Collections are an Orion-specific data structure, and there are limitations on how a collection may be used with a Workfloe object that is running locally on a developer machine. Although, similarly to datasets and files, a collection can be downloaded with ocli collection download <id>, it will be converted to a folder with files that cannot serve as an input to a Workfloe object.

However, a collection located in Orion can be used as an input to a WorkFloe object running locally. It needs to be specified by an ID corresponding to this collection in Orion. A collection ID can be found using ocli:

(myvirtualenv) > ocli collection list