Shard Collections, also known as collections, are made up of shards which are thin wrappers around files (not related to Orion Files). Each shard can store up to 5GBs of arbitrary binary data as well as 256KBs of user defined metadata in the form of JSON.
Example Use Cases¶
Performing a virtual screen on large numbers of molecules (\(>10^6\))
Accumulate data that constitutes 10 minutes to an hour’s worth of work within each shard.
The number of shards in the collection will determine the maximum number of parallel workers that can run at one time. It is recommended that collections contain no more than 500000 shards.
Storing Molecular Dynamics trajectory data
Collect numerous trajectories from a job into a collection.
Start a new simulation from a trajectory stored in a shard.
Output records can reference the collection/shard id containing trajectory data from which the record was generated.
Storing components of a directory structure
Use a tool like tar to store subdirectories.
Allows recreation of complete file structure by untarring all shards into same location.
Start operations from state contained within directory structure.
Collections and shards both have a set of states that have implications to what a developer should expect about a Collection or Shard at any given time. The following diagrams summarize these states.
┌────────┐ │Creation│ └────┬───┘ │ │ ▼ ┌────────┐ ┌─────────────────────────┐ ┌──────┐ │ Upon │ │ State: Open │ │ Upon │ │ open │ │ Shards can be added and │ │close │ └────────┘ │ deleted from a │ └──────┘ ┌─────────────▶│ collection. Can contain │────────────────┐ │ │shards that are not ready│ │ │ │ for retrieval. │ │ │ └─────────────────────────┘ │ │ │ │ ▼ ┌─────────────────────────────┐ ┌────────────────────────────────┐ │ State: Ready │ │ State: Queued/Processing │ │Shards are immutable in this │ │Collections are processed in the│ │state. All shards will be in │ │ background and can not be │ │ the `ready` state and be │ │ modified while processing. │ │ available for download. │◀──────┬──────────┬──────│ Processing removes all shards │ │ │ │Processing│ │ not in the ready state and │ │ │ │completed │ │ calculates the total size. │ └─────────────────────────────┘ └──────────┘ └────────────────────────────────┘
┌────────┐ │Creation│ └────────┘ │ ▼ ┌────────────────────────────────┐ │ State: Open │ │Shard has been created and ready│ │ for a file to be uploaded. │ │ │ │ │ │ │ └────────────────────────────────┘ ┌─────┐│ │Upon ││ │close││ └─────┘▼ ┌────────────────────────────────┐ │ State: Ready │ │ Indicates that the shard │ │contains data and the associated│ │ file data is available for │ │ retrieval. Cannot be modified │ │ once ready. │ │ │ └────────────────────────────────┘
Batch work items
Choose shard sizes balancing parallelism and overhead
Measure memory usage and computation time
Each shard is limited to 5GB
See Batching Work
Close shards downstream from creation
By default, newly-created shards are open
Parallel cubes may re-try work, which will result in multiple shards with the same output data
A shard must be closed in order to be marked as good and be kept as part of the collection
Only close shards in a downstream cube outside of any cube group, or you may get duplicate shards marked as good
Shards in the open, and error states will be DELETED when a collection is closed
Avoid unnecessary work while streaming
Shards can be downloaded as a stream, but the longer the network connection is open, the more likely an error will occur
Download to local files to avoid connection resets or running out of memory
When a dataset that has grown in size such that operating on individual records
does not scale well (i.e. \(>10^6\) records), it is time to transition to
working on batches of records. In other words, it should be using
orionclient.types.ShardCollection. The first step in this
transition is to transform the data. For this sort of floe, you will want to
DatasetBatcherCube to emit sets of records into parallel cube
group consisting of a
Parallel cubes cannot accumulate data predictably due to dynamic scaling, so they must be given all of the records up front. If they are fed individual records, then they cannot be expected to emit shards consisting of a set of records. The item_count parameter will not affect this.
Here is a simple example that reads a dataset and writes collection.
Additional record processing can be put in the parallel cube group between the
# CONFIDENTIAL. (C) 2020 OpenEye Scientific Software Inc. # All rights reserved. # ALL SOFTWARE BELOW IS PROPRIETARY AND CONFIDENTIAL TO OPENEYE # SCIENTIFIC SOFTWARE INC., AND IS SUBJECT TO THE FULL PROTECTION OF # COPYRIGHT AND TRADESECRET LAW. # Copying or modifying this software is strictly prohibited and illegal. # Using this software without a valid license from OpenEye is strictly # prohibited and illegal. De-obfuscating, de-minimizing, or any other # attempt to reverse engineer or discover the workings of this software, # is strictly prohibited. Even if software is exposed or visible, it is # still proprietary and confidential and the above prohibitions apply. # This software is NOT "Sample Code". For purposes of customization or # interoperation with other software you must use Sample Code # specifically labeled as such. # Please contact OpenEye at eyesopen.com/legal if you have any questions # about this warning. from floe.api import WorkFloe, ParallelCubeGroup from orionplatform.cubes import ( DatasetBatcherCube, CloseCollectionCube, CreateCollectionCube, ParallelDatasetBatchReaderCube, ParallelRecordsToCollectionCube, ) job = WorkFloe( "Test Dataset to Record Collection", title="Test Dataset to Collection Export", ) setup = CreateCollectionCube("setup") batcher = DatasetBatcherCube("batcher") reader = ParallelDatasetBatchReaderCube("reader") writer = ParallelRecordsToCollectionCube("writer") close = CloseCollectionCube("close") setup.modify_parameter(setup.collection_name, promoted=True) batcher.modify_parameter(batcher.data_in, promoted=True) batcher.modify_parameter(batcher.batch_size, promoted=True) writer.modify_parameter(writer.collection_type, promoted=True) group = ParallelCubeGroup(cubes=[reader, writer]) job.add_group(group) job.add_cubes(setup, batcher, reader, writer, close) setup.success.connect(writer.init) batcher.success.connect(reader.intake) reader.success.connect(writer.intake) writer.success.connect(close.intake) if __name__ == "__main__": job.run()
If work cannot be batched from the beginning, then aggregate work items in
serial cubes. For example, molecule database preparation and searching.
Molecule preparation needs relatively small shards because the amount of
computation time per shard is large. However, a FastROCS search requires
larger shards because the computation time per shard is low. So, references
to small shards created during preparation can be accumulated in a serial
AccumulateShardsCube, which emits lists to parallel cubes so
that they can operate on batches of small shards to create larger shards.