Class PipelinedStrategy
- java.lang.Object
-
- org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreSortStrategyBase
-
- org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy
-
- All Implemented Interfaces:
SortStrategy
,IndexStoreSortStrategy
public class PipelinedStrategy extends IndexStoreSortStrategyBase
Downloads the contents of the MongoDB repository dividing the tasks in a pipeline with the following stages:- Download - Downloads from Mongo all the documents in the node store.
- Transform - Converts Mongo documents to node state entries.
- Sort and save - Sorts the batch of node state entries and saves them to disk
- Merge sorted files - Merge the intermediate sorted files into a single file (the final FlatFileStore).
Memory management
For efficiency, the intermediate sorted files should be as large as possible given the memory constraints. This strategy accumulates the entries that will be stored in each of these files in memory until reaching a maximum configurable size, at which point it sorts the data and writes it to a file. The data is accumulated in instances of
NodeStateEntryBatch
. This class contains two data structures:- A
ByteBuffer
for the binary representation of the entry, that is, the byte array that will be written to the file. This buffer contains length-prefixed byte arrays, that is, each entry is<size><data>
, where size is a 4 byte int. - An array of
SortKey
instances, which contain the paths of each entry and are used to sort the entries. Each element in this array also contains the position in the ByteBuffer of the serialized representation of the entry.
- It is compact, as a String object in the heap requires more memory than a length-prefixed byte array in the ByteBuffer.
- Predictable memory usage - the memory used by the
ByteBuffer
is fixed and allocated at startup (more on this later). The memory used by the array ofSortKey
is not bounded, but these objects are small, as they contain little more than the path of the entry, and we can easily put limits on the maximum number of entries kept in a buffer.
The instances of
NodeStateEntryBatch
are created at launch time. We create#transformThreads+1
buffers. This way, except for some rare situations, each transform thread will have its own buffer where to write the entries and there will be an extra buffer to be used by the Save-and-Sort thread, so that all the transform and sort threads can operate concurrently.These buffers are reused. Once the Save-and-Sort thread finishes processing a buffer, it clears it and sends it back to the transform threads. For this, we use two queues, one with empty buffers, from where the transform threads take their buffers when they need one, and another with full buffers, which are read by the Save-and-Sort thread.
Reusing the buffers reduces significantly the pressure on the garbage collector and ensures that we do not run out of memory, as the largest blocks of memory are pre-allocated and reused.
The total amount of memory used by the buffers is a configurable parameter (env variable
OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB
). This memory is divided innumberOfBuffers + 1 </code>
regions, each ofregionSize = PIPELINED_WORKING_MEMORY_MB/(#numberOfBuffers + 1)
size. Each ByteBuffer is ofregionSize
big. The extra region is to account for the memory taken by theSortKey
entries. There is also a maximum limit on the number of entries, which is calculated based on regionSize (we assume eachSortKey
entry requires 256 bytes).The transform threads will stop filling a buffer and enqueue it for sorting and saving once either the byte buffer is full or the number of entries in the buffer reaches the limit.
Retrials on broken MongoDB connections
-
-
Field Summary
-
Constructor Summary
Constructors Constructor Description PipelinedStrategy(com.mongodb.MongoClientURI mongoClientURI, MongoDocumentStore documentStore, DocumentNodeStore documentNodeStore, RevisionVector rootRevision, Set<String> preferredPathElements, BlobStore blobStore, File storeDir, Compression algorithm, Predicate<String> pathPredicate, List<PathFilter> pathFilters, String checkpoint, StatisticsProvider statisticsProvider, IndexingReporter indexingReporter)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description File
createSortedStoreFile()
long
getEntryCount()
-
Methods inherited from class org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreSortStrategyBase
createMetadataFile, getAlgorithm, getCheckpoint, getPathPredicate, getPreferredPaths, getStoreDir, getStoreType, getStrategyName
-
-
-
-
Field Detail
-
OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_SIZE_MB
public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_SIZE_MB
- See Also:
- Constant Field Values
-
DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_SIZE_MB
public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_SIZE_MB
- See Also:
- Constant Field Values
-
OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_NUMBER_OF_DOCUMENTS
public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_NUMBER_OF_DOCUMENTS
- See Also:
- Constant Field Values
-
DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_NUMBER_OF_DOCUMENTS
public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_NUMBER_OF_DOCUMENTS
- See Also:
- Constant Field Values
-
OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB
public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB
- See Also:
- Constant Field Values
-
DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB
public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB
- See Also:
- Constant Field Values
-
OAK_INDEXER_PIPELINED_TRANSFORM_THREADS
public static final String OAK_INDEXER_PIPELINED_TRANSFORM_THREADS
- See Also:
- Constant Field Values
-
DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS
public static final int DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS
- See Also:
- Constant Field Values
-
OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB
public static final String OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB
- See Also:
- Constant Field Values
-
DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB
public static final int DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB
- See Also:
- Constant Field Values
-
OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE
public static final String OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE
- See Also:
- Constant Field Values
-
DEFAULT_OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE
public static final int DEFAULT_OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE
- See Also:
- Constant Field Values
-
-
Constructor Detail
-
PipelinedStrategy
public PipelinedStrategy(com.mongodb.MongoClientURI mongoClientURI, MongoDocumentStore documentStore, DocumentNodeStore documentNodeStore, RevisionVector rootRevision, Set<String> preferredPathElements, BlobStore blobStore, File storeDir, Compression algorithm, Predicate<String> pathPredicate, List<PathFilter> pathFilters, String checkpoint, StatisticsProvider statisticsProvider, IndexingReporter indexingReporter)
- Parameters:
mongoClientURI
- URI of the Mongo cluster.pathPredicate
- Used by the transform stage to test if a node should be kept or discarded.pathFilters
- If non-empty, the download stage will use these filters to create a query that downloads only the matching MongoDB documents.statisticsProvider
- Used to collect statistics about the indexing process.indexingReporter
- Used to collect diagnostics, metrics and statistics and report them at the end of the indexing process.
-
-
Method Detail
-
createSortedStoreFile
public File createSortedStoreFile() throws IOException
- Throws:
IOException
-
getEntryCount
public long getEntryCount()
-
-