Shard Collections

Shard Collections, also known as collections, are made up of shards which are thin wrappers around files (not related to Orion Files). Each shard can store up to 5GBs of arbitrary binary data as well as 256KBs of user defined metadata in the form of JSON.

Example Use Cases

  • Performing a virtual screen on large numbers of molecules (\(>10^6\))
    • Accumulate data that constitutes 10 minutes to an hour’s worth of work within each shard.
    • The number of shards in the collection will determine the maximum number of parallel workers that can run at one time. It is recommended that collections contain no more than 500000 shards.
  • Storing Molecular Dynamics trajectory data
    • Collect numerous trajectories from a job into a collection.
    • Start a new simulation from a trajectory stored in a shard.
    • Output records can reference the collection/shard id containing trajectory data from which the record was generated.
  • Storing components of a directory structure
    • Use a tool like tar to store subdirectories.
    • Allows recreation of complete file structure by untarring all shards into same location.
    • Start operations from state contained within directory structure.

Lifecycle

Collections and shards both have a set of states that have implications to what a developer should expect about a Collection or Shard at any given time. The following diagrams summarize these states.

Collections

                                      ┌────────┐
                                      │Creation│
                                      └────┬───┘
                                           │
                                           │
                                           ▼
             ┌────────┐       ┌─────────────────────────┐            ┌──────┐
             │  Upon  │       │       State: Open       │            │ Upon │
             │  open  │       │ Shards can be added and │            │close │
             └────────┘       │     deleted from a      │            └──────┘
               ┌─────────────▶│ collection. Can contain │────────────────┐
               │              │shards that are not ready│                │
               │              │     for retrieval.      │                │
               │              └─────────────────────────┘                │
               │                                                         │
               │                                                         ▼
┌─────────────────────────────┐                         ┌────────────────────────────────┐
│        State: Ready         │                         │    State: Queued/Processing    │
│Shards are immutable in this │                         │Collections are processed in the│
│state. All shards will be in │                         │   background and can not be    │
│  the `ready` state and be   │                         │   modified while processing.   │
│   available for download.   │◀──────┬──────────┬──────│ Processing removes all shards  │
│                             │       │Processing│      │ not in the ready state and     │
│                             │       │completed │      │   calculates the total size.   │
└─────────────────────────────┘       └──────────┘      └────────────────────────────────┘

Shards

            ┌────────┐
            │Creation│
            └────────┘
                 │
                 ▼
┌────────────────────────────────┐
│          State: Open           │
│Shard has been created and ready│
│   for a file to be uploaded.   │
│                                │
│                                │
│                                │
└────────────────────────────────┘
          ┌─────┐│
          │Upon ││
          │close││
          └─────┘▼
┌────────────────────────────────┐
│          State: Ready          │
│    Indicates that the shard    │
│contains data and the associated│
│   file data is available for   │
│ retrieval. Cannot be modified  │
│           once ready.          │
│                                │
└────────────────────────────────┘

Guidelines

  • Batch work items
    • Choose shard sizes balancing parallelism and overhead
    • Measure memory usage and computation time
    • Each shard is limited to 5GB
    • See Batching Work
  • Close shards downstream from creation
    • By default, newly-created shards are open
    • Parallel cubes may re-try work, which will result in multiple shards with the same output data
    • A shard must be closed in order to be marked as good and be kept as part of the collection
    • Only close shards in a downstream cube outside of any cube group, or you may get duplicate shards marked as good
    • Minimize use of temporary shards since they will create more work for CloseShardsCube, ShardCollection.close(), and ShardCollection.list_shards()

Warning

Shards in the open, and error states will be DELETED when a collection is closed

  • Avoid unnecessary work while streaming
    • Shards can be downloaded as a stream, but the longer the network connection is open, the more likely an error will occur
    • Download to local files to avoid connection resets or running out of memory

Batching Work

When a dataset that has grown in size such that operating on individual records does not scale well (i.e. \(>10^6\) records), it is time to transition to working on batches of records. In other words, it should be using orionclient.types.ShardCollections. The first step in this transition is to transform the data. For this sort of floe, you will want to use a DatasetBatcherCube to emit sets of records into parallel cube group consisting of a ParallelDatasetBatchReaderCube and ParallelRecordsToCollectionCube.

Parallel cubes cannot accumulate data predictably due to dynamic scaling, so they must be given all of the records up front. If they are fed individual records, then they cannot be expected to emit shards consisting of a set of records. The item_count parameter will not affect this.

Here is a simple example that reads a dataset and writes collection. Additional record processing can be put in the parallel cube group between the ParallelDatasetBatchReaderCube and ParallelRecordsToCollectionCube.

# CONFIDENTIAL. (C) 2020 OpenEye Scientific Software Inc.
# All rights reserved.
# ALL SOFTWARE BELOW IS PROPRIETARY AND CONFIDENTIAL TO OPENEYE
# SCIENTIFIC SOFTWARE INC., AND IS SUBJECT TO THE FULL PROTECTION OF
# COPYRIGHT AND TRADESECRET LAW.
# Copying or modifying this software is strictly prohibited and illegal.
# Using this software without a valid license from OpenEye is strictly
# prohibited and illegal.  De-obfuscating, de-minimizing, or any other
# attempt to reverse engineer or discover the workings of this software,
# is strictly prohibited. Even if software is exposed or visible, it is
# still proprietary and confidential and the above prohibitions apply.
# This software is NOT "Sample Code". For purposes of customization or
# interoperation with other software you must use Sample Code
# specifically labeled as such.
# Please contact OpenEye at eyesopen.com/legal if you have any questions
# about this warning.
from floe.api import WorkFloe, ParallelCubeGroup

from orionplatform.cubes import (
    DatasetBatcherCube,
    ParallelDatasetBatchReaderCube,
    ParallelRecordsToCollectionCube,
    CreateCollectionCube,
    CloseCollectionCube,
)

job = WorkFloe(
    "Test Dataset to Record Collection",
    title="Test Dataset to Collection Export",
)
setup = CreateCollectionCube("setup")
batcher = DatasetBatcherCube("batcher")
reader = ParallelDatasetBatchReaderCube("reader")
writer = ParallelRecordsToCollectionCube("writer")
close = CloseCollectionCube("close")

setup.modify_parameter(setup.collection_name, promoted=True)
batcher.modify_parameter(batcher.data_in, promoted=True)
batcher.modify_parameter(batcher.batch_size, promoted=True)
writer.modify_parameter(writer.collection_type, promoted=True)

group = ParallelCubeGroup(cubes=[reader, writer])
job.add_group(group)

job.add_cubes(setup, batcher, reader, writer, close)

setup.success.connect(writer.init)
batcher.success.connect(reader.intake)

reader.success.connect(writer.intake)

writer.success.connect(close.intake)


if __name__ == "__main__":
    job.run()

If work cannot be batched from the beginning, then aggregate work items in serial cubes. For example, molecule database preparation and searching. Molecule preparation needs relatively small shards because the amount of computation time per shard is large. However, a FastROCS search requires larger shards because the computation time per shard is low. So, references to small shards created during preparation can be accumulated in a serial AccumulateShardsCube, which emits lists to parallel cubes so that they can operate on batches of small shards to create larger shards.