Class PipelinedStrategy

  • All Implemented Interfaces:
    SortStrategy, IndexStoreSortStrategy

    public class PipelinedStrategy
    extends IndexStoreSortStrategyBase
    Downloads the contents of the MongoDB repository dividing the tasks in a pipeline with the following stages:
    • Download - Downloads from Mongo all the documents in the node store.
    • Transform - Converts Mongo documents to node state entries.
    • Sort and save - Sorts the batch of node state entries and saves them to disk
    • Merge sorted files - Merge the intermediate sorted files into a single file (the final FlatFileStore).

    Memory management

    For efficiency, the intermediate sorted files should be as large as possible given the memory constraints. This strategy accumulates the entries that will be stored in each of these files in memory until reaching a maximum configurable size, at which point it sorts the data and writes it to a file. The data is accumulated in instances of NodeStateEntryBatch. This class contains two data structures:

    • A ByteBuffer for the binary representation of the entry, that is, the byte array that will be written to the file. This buffer contains length-prefixed byte arrays, that is, each entry is <size><data>, where size is a 4 byte int.
    • An array of SortKey instances, which contain the paths of each entry and are used to sort the entries. Each element in this array also contains the position in the ByteBuffer of the serialized representation of the entry.
    This representation has several advantages:
    • It is compact, as a String object in the heap requires more memory than a length-prefixed byte array in the ByteBuffer.
    • Predictable memory usage - the memory used by the ByteBuffer is fixed and allocated at startup (more on this later). The memory used by the array of SortKey is not bounded, but these objects are small, as they contain little more than the path of the entry, and we can easily put limits on the maximum number of entries kept in a buffer.

    The instances of NodeStateEntryBatch are created at launch time. We create #transformThreads+1 buffers. This way, except for some rare situations, each transform thread will have its own buffer where to write the entries and there will be an extra buffer to be used by the Save-and-Sort thread, so that all the transform and sort threads can operate concurrently.

    These buffers are reused. Once the Save-and-Sort thread finishes processing a buffer, it clears it and sends it back to the transform threads. For this, we use two queues, one with empty buffers, from where the transform threads take their buffers when they need one, and another with full buffers, which are read by the Save-and-Sort thread.

    Reusing the buffers reduces significantly the pressure on the garbage collector and ensures that we do not run out of memory, as the largest blocks of memory are pre-allocated and reused.

    The total amount of memory used by the buffers is a configurable parameter (env variable OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB). This memory is divided in numberOfBuffers + 1 </code> regions, each of regionSize = PIPELINED_WORKING_MEMORY_MB/(#numberOfBuffers + 1) size. Each ByteBuffer is of regionSize big. The extra region is to account for the memory taken by the SortKey entries. There is also a maximum limit on the number of entries, which is calculated based on regionSize (we assume each SortKey entry requires 256 bytes).

    The transform threads will stop filling a buffer and enqueue it for sorting and saving once either the byte buffer is full or the number of entries in the buffer reaches the limit.

    Retrials on broken MongoDB connections

    • Field Detail

      • OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_SIZE_MB

        public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_SIZE_MB
        See Also:
        Constant Field Values
      • DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_SIZE_MB

        public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_SIZE_MB
        See Also:
        Constant Field Values
      • OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_NUMBER_OF_DOCUMENTS

        public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_NUMBER_OF_DOCUMENTS
        See Also:
        Constant Field Values
      • DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_NUMBER_OF_DOCUMENTS

        public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_NUMBER_OF_DOCUMENTS
        See Also:
        Constant Field Values
      • OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB

        public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB
        See Also:
        Constant Field Values
      • DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB

        public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB
        See Also:
        Constant Field Values
      • OAK_INDEXER_PIPELINED_TRANSFORM_THREADS

        public static final String OAK_INDEXER_PIPELINED_TRANSFORM_THREADS
        See Also:
        Constant Field Values
      • DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS

        public static final int DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS
        See Also:
        Constant Field Values
      • OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB

        public static final String OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB
        See Also:
        Constant Field Values
      • DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB

        public static final int DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB
        See Also:
        Constant Field Values
      • OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE

        public static final String OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE
        See Also:
        Constant Field Values
      • DEFAULT_OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE

        public static final int DEFAULT_OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE
        See Also:
        Constant Field Values
    • Constructor Detail

      • PipelinedStrategy

        public PipelinedStrategy​(com.mongodb.MongoClientURI mongoClientURI,
                                 MongoDocumentStore documentStore,
                                 DocumentNodeStore documentNodeStore,
                                 RevisionVector rootRevision,
                                 Set<String> preferredPathElements,
                                 BlobStore blobStore,
                                 File storeDir,
                                 Compression algorithm,
                                 Predicate<String> pathPredicate,
                                 List<PathFilter> pathFilters,
                                 String checkpoint,
                                 StatisticsProvider statisticsProvider,
                                 IndexingReporter indexingReporter)
        Parameters:
        mongoClientURI - URI of the Mongo cluster.
        pathPredicate - Used by the transform stage to test if a node should be kept or discarded.
        pathFilters - If non-empty, the download stage will use these filters to create a query that downloads only the matching MongoDB documents.
        statisticsProvider - Used to collect statistics about the indexing process.
        indexingReporter - Used to collect diagnostics, metrics and statistics and report them at the end of the indexing process.
    • Method Detail

      • getEntryCount

        public long getEntryCount()