Shard Collections¶

Shard Collections, also known as collections, are made up of shards which are thin wrappers around files (not related to Orion Files). Each shard can store up to 5GBs of arbitrary binary data as well as 256KBs of user defined metadata in the form of JSON.

Example Use Cases¶

• Performing a virtual screen on large numbers of molecules ($$>10^6$$)
• Accumulate data that constitutes 10 minutes to an hour’s worth of work within each shard.
• The number of shards in the collection will determine the maximum number of parallel workers that can run at one time. It is recommended that collections contain no more than 500000 shards.
• Storing Molecular Dynamics trajectory data
• Collect numerous trajectories from a job into a collection.
• Start a new simulation from a trajectory stored in a shard.
• Output records can reference the collection/shard id containing trajectory data from which the record was generated.
• Storing components of a directory structure
• Use a tool like tar to store subdirectories.
• Allows recreation of complete file structure by untarring all shards into same location.
• Start operations from state contained within directory structure.

Lifecycle¶

Collections and shards both have a set of states that have implications to what a developer should expect about a Collection or Shard at any given time. The following diagrams summarize these states.

Collections¶

┌────────┐
│Creation│
└────┬───┘
│
│
▼
┌────────┐       ┌─────────────────────────┐            ┌──────┐
│  Upon  │       │       State: Open       │            │ Upon │
│  open  │       │ Shards can be added and │            │close │
└────────┘       │     deleted from a      │            └──────┘
┌─────────────▶│ collection. Can contain │────────────────┐
│              │shards that are not ready│                │
│              │     for retrieval.      │                │
│              └─────────────────────────┘                │
│                                                         │
│                                                         ▼
┌─────────────────────────────┐                         ┌────────────────────────────────┐
│        State: Ready         │                         │    State: Queued/Processing    │
│Shards are immutable in this │                         │Collections are processed in the│
│state. All shards will be in │                         │   background and can not be    │
│  the ready state and be   │                         │   modified while processing.   │
│                             │       │Processing│      │ not in the ready state and     │
│                             │       │completed │      │   calculates the total size.   │
└─────────────────────────────┘       └──────────┘      └────────────────────────────────┘

Shards¶

┌────────┐
│Creation│
└────────┘
│
▼
┌────────────────────────────────┐
│          State: Open           │
│Shard has been created and ready│
│   for a file to be uploaded.   │
│                                │
│                                │
│                                │
└────────────────────────────────┘
┌─────┐│
│Upon ││
│close││
└─────┘▼
┌────────────────────────────────┐
│    Indicates that the shard    │
│contains data and the associated│
│   file data is available for   │
│ retrieval. Cannot be modified  │
│                                │
└────────────────────────────────┘

Guidelines¶

• Batch work items
• Choose shard sizes balancing parallelism and overhead
• Measure memory usage and computation time
• Each shard is limited to 5GB
• See Batching Work
• Close shards downstream from creation
• By default, newly-created shards are open
• Parallel cubes may re-try work, which will result in multiple shards with the same output data
• A shard must be closed in order to be marked as good and be kept as part of the collection
• Only close shards in a downstream cube outside of any cube group, or you may get duplicate shards marked as good
• Minimize use of temporary shards since they will create more work for CloseShardsCube, ShardCollection.close(), and ShardCollection.list_shards()

Warning

Shards in the open, and error states will be DELETED when a collection is closed

• Avoid unnecessary work while streaming
• Shards can be downloaded as a stream, but the longer the network connection is open, the more likely an error will occur
• Download to local files to avoid connection resets or running out of memory

Batching Work¶

When a dataset that has grown in size such that operating on individual records does not scale well (i.e. $$>10^6$$ records), it is time to transition to working on batches of records. In other words, it should be using orionclient.types.ShardCollections. The first step in this transition is to transform the data. For this sort of floe, you will want to use a DatasetBatcherCube to emit sets of records into parallel cube group consisting of a ParallelDatasetBatchReaderCube and ParallelRecordsToCollectionCube.

Parallel cubes cannot accumulate data predictably due to dynamic scaling, so they must be given all of the records up front. If they are fed individual records, then they cannot be expected to emit shards consisting of a set of records. The item_count parameter will not affect this.

Here is a simple example that reads a dataset and writes collection. Additional record processing can be put in the parallel cube group between the ParallelDatasetBatchReaderCube and ParallelRecordsToCollectionCube.

# CONFIDENTIAL. (C) 2020 OpenEye Scientific Software Inc.
# ALL SOFTWARE BELOW IS PROPRIETARY AND CONFIDENTIAL TO OPENEYE
# SCIENTIFIC SOFTWARE INC., AND IS SUBJECT TO THE FULL PROTECTION OF
# Copying or modifying this software is strictly prohibited and illegal.
# Using this software without a valid license from OpenEye is strictly
# prohibited and illegal.  De-obfuscating, de-minimizing, or any other
# attempt to reverse engineer or discover the workings of this software,
# is strictly prohibited. Even if software is exposed or visible, it is
# still proprietary and confidential and the above prohibitions apply.
# This software is NOT "Sample Code". For purposes of customization or
# interoperation with other software you must use Sample Code
# specifically labeled as such.
from floe.api import WorkFloe, ParallelCubeGroup

from orionplatform.cubes import (
DatasetBatcherCube,
ParallelRecordsToCollectionCube,
CreateCollectionCube,
CloseCollectionCube,
)

job = WorkFloe(
"Test Dataset to Record Collection",
title="Test Dataset to Collection Export",
)
setup = CreateCollectionCube("setup")
batcher = DatasetBatcherCube("batcher")
writer = ParallelRecordsToCollectionCube("writer")
close = CloseCollectionCube("close")

setup.modify_parameter(setup.collection_name, promoted=True)
batcher.modify_parameter(batcher.data_in, promoted=True)
batcher.modify_parameter(batcher.batch_size, promoted=True)
writer.modify_parameter(writer.collection_type, promoted=True)