From c27566aaec63b6d0fea274b709ba4559d7972036 Mon Sep 17 00:00:00 2001 From: dmkozh Date: Thu, 21 May 2026 12:58:26 -0400 Subject: [PATCH] Parallelize InMemoryIndex construction with bucket put loop. Build the LiveBucketIndex on an async worker thread while the put loop in mergeInMemory runs on the main thread. Both only read mergedEntries as const, so they need no synchronization. --- src/bucket/BucketOutputIterator.cpp | 9 +++++++-- src/bucket/BucketOutputIterator.h | 2 ++ src/bucket/LiveBucket.cpp | 14 +++++++++++++- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/bucket/BucketOutputIterator.cpp b/src/bucket/BucketOutputIterator.cpp index 6645f51143..43fd611cd9 100644 --- a/src/bucket/BucketOutputIterator.cpp +++ b/src/bucket/BucketOutputIterator.cpp @@ -168,7 +168,8 @@ template std::shared_ptr BucketOutputIterator::getBucket( BucketManager& bucketManager, MergeKey* mergeKey, - std::unique_ptr> inMemoryState) + std::unique_ptr> inMemoryState, + std::shared_ptr preBuiltIndex) { ZoneScoped; if (mBuf) @@ -219,7 +220,11 @@ BucketOutputIterator::getBucket( if (!index) { - if constexpr (std::is_same_v) + if (preBuiltIndex) + { + index = std::move(preBuiltIndex); + } + else if constexpr (std::is_same_v) { if (inMemoryState) { diff --git a/src/bucket/BucketOutputIterator.h b/src/bucket/BucketOutputIterator.h index a76e1c6bb7..99b42ec2d0 100644 --- a/src/bucket/BucketOutputIterator.h +++ b/src/bucket/BucketOutputIterator.h @@ -55,6 +55,8 @@ template class BucketOutputIterator std::shared_ptr getBucket( BucketManager& bucketManager, MergeKey* mergeKey = nullptr, std::unique_ptr> inMemoryState = + nullptr, + std::shared_ptr preBuiltIndex = nullptr); }; } diff --git a/src/bucket/LiveBucket.cpp b/src/bucket/LiveBucket.cpp index 8101c9d183..e38287636b 100644 --- a/src/bucket/LiveBucket.cpp +++ b/src/bucket/LiveBucket.cpp @@ -10,6 +10,7 @@ #include "bucket/BucketOutputIterator.h" #include "bucket/BucketUtils.h" #include "bucket/LedgerCmp.h" +#include #include namespace stellar @@ -595,6 +596,14 @@ LiveBucket::mergeInMemory(BucketManager& bucketManager, bucketManager.incrMergeCounters(mc); } + // Start index construction on a worker thread, the inputs are all + // read-only from that point on. + auto indexFuture = std::async( + std::launch::async, [&bucketManager, &mergedEntries, &meta]() { + return std::make_shared(bucketManager, + mergedEntries, meta); + }); + // Write merge output to a bucket and save to disk LiveBucketOutputIterator out(bucketManager.getTmpDir(), /*keepTombstoneEntries=*/true, meta, mc, ctx, @@ -605,11 +614,14 @@ LiveBucket::mergeInMemory(BucketManager& bucketManager, out.put(e); } + auto preBuiltIndex = indexFuture.get(); + // Store the merged entries in memory in the new bucket in case this // bucket sees another incoming merge as level 0 curr. return out.getBucket( bucketManager, nullptr, - std::make_unique>(std::move(mergedEntries))); + std::make_unique>(std::move(mergedEntries)), + std::move(preBuiltIndex)); } BucketEntryCounters const&