Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 28 additions & 83 deletions src/bucket/BucketListSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,8 @@ BucketListSnapshotData<BucketT>::BucketListSnapshotData(
template <class BucketT>
SearchableBucketListSnapshot<BucketT>::SearchableBucketListSnapshot(
MetricsRegistry& metrics,
std::shared_ptr<BucketListSnapshotData<BucketT> const> data,
std::map<uint32_t, std::shared_ptr<BucketListSnapshotData<BucketT> const>>
historicalSnapshots,
uint32_t ledgerSeq)
std::shared_ptr<BucketListSnapshotData<BucketT> const> data)
: mData(std::move(data))
, mHistoricalSnapshots(std::move(historicalSnapshots))
, mLedgerSeq(ledgerSeq)
, mMetrics(metrics)
, mBulkLoadMeter(
metrics.NewMeter({BucketT::METRIC_STRING, "query", "loads"}, "query"))
Expand All @@ -85,8 +80,6 @@ template <class BucketT>
SearchableBucketListSnapshot<BucketT>::SearchableBucketListSnapshot(
SearchableBucketListSnapshot const& other)
: mData(other.mData)
, mHistoricalSnapshots(other.mHistoricalSnapshots)
, mLedgerSeq(other.mLedgerSeq)
// mStreams intentionally left empty — each copy gets its own stream cache
, mMetrics(other.mMetrics)
, mPointTimers(other.mPointTimers)
Expand All @@ -103,8 +96,6 @@ SearchableBucketListSnapshot<BucketT>::operator=(
if (this != &other)
{
mData = other.mData;
mHistoricalSnapshots = other.mHistoricalSnapshots;
mLedgerSeq = other.mLedgerSeq;
mStreams.clear();
mMetrics = other.mMetrics;
mPointTimers = other.mPointTimers;
Expand Down Expand Up @@ -345,51 +336,6 @@ SearchableBucketListSnapshot<BucketT>::load(LedgerKey const& k) const
return result;
}

template <class BucketT>
std::optional<std::vector<typename BucketT::LoadT>>
SearchableBucketListSnapshot<BucketT>::loadKeysInternal(
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
std::optional<uint32_t> ledgerSeq) const
{
ZoneScoped;
releaseAssert(mData);

// Make a copy of the key set, this loop is destructive
auto keys = inKeys;
std::vector<typename BucketT::LoadT> entries;

auto loadKeysLoop = [&](std::shared_ptr<BucketT const> const& bucket) {
loadKeysFromBucket(bucket, keys, entries);
return keys.empty() ? Loop::COMPLETE : Loop::INCOMPLETE;
};

if (!ledgerSeq || *ledgerSeq == mLedgerSeq)
{
loopAllBuckets(loadKeysLoop, *mData);
}
else
{
auto iter = mHistoricalSnapshots.find(*ledgerSeq);
if (iter == mHistoricalSnapshots.end())
{
return std::nullopt;
}
releaseAssert(iter->second);
loopAllBuckets(loadKeysLoop, *iter->second);
}

return entries;
}

template <class BucketT>
std::optional<std::vector<typename BucketT::LoadT>>
SearchableBucketListSnapshot<BucketT>::loadKeysFromLedger(
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
uint32_t ledgerSeq) const
{
return loadKeysInternal(inKeys, ledgerSeq);
}

template <class BucketT>
medida::Timer&
SearchableBucketListSnapshot<BucketT>::getBulkLoadTimer(
Expand Down Expand Up @@ -418,27 +364,14 @@ SearchableBucketListSnapshot<BucketT>::getSnapshotData() const
return mData;
}

template <class BucketT>
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<BucketT> const>> const&
SearchableBucketListSnapshot<BucketT>::getHistoricalSnapshots() const
{
return mHistoricalSnapshots;
}

//
// SearchableLiveBucketListSnapshot
//

SearchableLiveBucketListSnapshot::SearchableLiveBucketListSnapshot(
MetricsRegistry& metrics,
std::shared_ptr<BucketListSnapshotData<LiveBucket> const> data,
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<LiveBucket> const>>
historicalSnapshots,
uint32_t ledgerSeq)
: SearchableBucketListSnapshot<LiveBucket>(
metrics, std::move(data), std::move(historicalSnapshots), ledgerSeq)
std::shared_ptr<BucketListSnapshotData<LiveBucket> const> data)
: SearchableBucketListSnapshot<LiveBucket>(metrics, std::move(data))
{
}

Expand All @@ -447,10 +380,18 @@ SearchableLiveBucketListSnapshot::loadKeys(
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
std::string const& label) const
{
ZoneScoped;
releaseAssert(mData);
auto timer = getBulkLoadTimer(label, inKeys.size()).TimeScope();
auto op = loadKeysInternal(inKeys, std::nullopt);
releaseAssertOrThrow(op);
return std::move(*op);

auto keys = inKeys;
std::vector<LedgerEntry> entries;
auto loadKeysLoop = [&](std::shared_ptr<LiveBucket const> const& bucket) {
Comment thread
SirTyson marked this conversation as resolved.
Outdated
loadKeysFromBucket(bucket, keys, entries);
return keys.empty() ? Loop::COMPLETE : Loop::INCOMPLETE;
};
loopAllBuckets(loadKeysLoop, *mData);
return entries;
}

// This query has two steps:
Expand Down Expand Up @@ -864,23 +805,27 @@ SearchableLiveBucketListSnapshot::scanForEvictionInBucket(

SearchableHotArchiveBucketListSnapshot::SearchableHotArchiveBucketListSnapshot(
MetricsRegistry& metrics,
std::shared_ptr<BucketListSnapshotData<HotArchiveBucket> const> data,
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<HotArchiveBucket> const>>
historicalSnapshots,
uint32_t ledgerSeq)
: SearchableBucketListSnapshot<HotArchiveBucket>(
metrics, std::move(data), std::move(historicalSnapshots), ledgerSeq)
std::shared_ptr<BucketListSnapshotData<HotArchiveBucket> const> data)
: SearchableBucketListSnapshot<HotArchiveBucket>(metrics, std::move(data))
{
}

std::vector<HotArchiveBucketEntry>
SearchableHotArchiveBucketListSnapshot::loadKeys(
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys) const
{
auto op = loadKeysInternal(inKeys, std::nullopt);
releaseAssertOrThrow(op);
return std::move(*op);
ZoneScoped;
releaseAssert(mData);

auto keys = inKeys;
std::vector<HotArchiveBucketEntry> entries;
auto loadKeysLoop =
[&](std::shared_ptr<HotArchiveBucket const> const& bucket) {
loadKeysFromBucket(bucket, keys, entries);
return keys.empty() ? Loop::COMPLETE : Loop::INCOMPLETE;
};
loopAllBuckets(loadKeysLoop, *mData);
return entries;
}

void
Expand Down
47 changes: 3 additions & 44 deletions src/bucket/BucketListSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

#include <functional>
#include <list>
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include <vector>
Expand Down Expand Up @@ -81,13 +79,6 @@ template <class BucketT> class SearchableBucketListSnapshot
protected:
// Shared immutable snapshot data
std::shared_ptr<BucketListSnapshotData<BucketT> const> mData;
std::map<uint32_t, std::shared_ptr<BucketListSnapshotData<BucketT> const>>
mHistoricalSnapshots;

// Ledger sequence number for this snapshot, used internally to route
// queries between current and historical data. Not exposed publicly;
// callers should get ledger metadata from ImmutableLedgerData.
uint32_t mLedgerSeq;

// Per-snapshot mutable stream cache
mutable UnorderedMap<BucketT const*, std::unique_ptr<XDRInputFileStream>>
Expand Down Expand Up @@ -133,29 +124,19 @@ template <class BucketT> class SearchableBucketListSnapshot
std::set<LedgerKey, LedgerEntryIdCmp>& keys,
std::vector<typename BucketT::LoadT>& result) const;

std::optional<std::vector<typename BucketT::LoadT>>
loadKeysInternal(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
std::optional<uint32_t> ledgerSeq) const;

medida::Timer& getBulkLoadTimer(std::string const& label,
size_t numEntries) const;

// Iterate over all buckets in a snapshot in order, calling f on each
// non-empty bucket. Exits early if function returns Loop::COMPLETE.
// The first overload operates on an explicit snapshot (used for historical
// queries).
template <typename Func>
void loopAllBuckets(Func&& f,
BucketListSnapshotData<BucketT> const& snapshot) const;
template <typename Func> void loopAllBuckets(Func&& f) const;

SearchableBucketListSnapshot(
MetricsRegistry& metrics,
std::shared_ptr<BucketListSnapshotData<BucketT> const> data,
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<BucketT> const>>
historicalSnapshots,
uint32_t ledgerSeq);
std::shared_ptr<BucketListSnapshotData<BucketT> const> data);

public:
// Copy: copies all state except mStreams, which is reset to empty.
Expand All @@ -171,23 +152,9 @@ template <class BucketT> class SearchableBucketListSnapshot
std::shared_ptr<typename BucketT::LoadT const>
load(LedgerKey const& k) const;

// Loads inKeys from the specified historical snapshot. Returns
// load_result_vec if the snapshot for the given ledger is
// available, std::nullopt otherwise. Note that ledgerSeq is defined
// as the state of the BucketList at the beginning of the ledger. This means
// that for ledger N, the maximum lastModifiedLedgerSeq of any LedgerEntry
// in the BucketList is N - 1.
std::optional<std::vector<typename BucketT::LoadT>>
loadKeysFromLedger(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
uint32_t ledgerSeq) const;

// Access to underlying data (for copying/refreshing)
std::shared_ptr<BucketListSnapshotData<BucketT> const> const&
getSnapshotData() const;

std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<BucketT> const>> const&
getHistoricalSnapshots() const;
};

// Live bucket list snapshot with additional query methods
Expand All @@ -196,11 +163,7 @@ class SearchableLiveBucketListSnapshot
{
SearchableLiveBucketListSnapshot(
MetricsRegistry& metrics,
std::shared_ptr<BucketListSnapshotData<LiveBucket> const> data,
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<LiveBucket> const>>
historicalSnapshots,
uint32_t ledgerSeq);
std::shared_ptr<BucketListSnapshotData<LiveBucket> const> data);

Loop scanForEvictionInBucket(
std::shared_ptr<LiveBucket const> const& bucket, EvictionIterator& iter,
Expand Down Expand Up @@ -244,11 +207,7 @@ class SearchableHotArchiveBucketListSnapshot
{
SearchableHotArchiveBucketListSnapshot(
MetricsRegistry& metrics,
std::shared_ptr<BucketListSnapshotData<HotArchiveBucket> const> data,
std::map<uint32_t, std::shared_ptr<
BucketListSnapshotData<HotArchiveBucket> const>>
historicalSnapshots,
uint32_t ledgerSeq);
std::shared_ptr<BucketListSnapshotData<HotArchiveBucket> const> data);

public:
SearchableHotArchiveBucketListSnapshot(
Expand Down
63 changes: 0 additions & 63 deletions src/bucket/test/BucketIndexTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,58 +193,6 @@ class BucketIndexTest
buildBucketList(f, isCacheTest);
}

void
runHistoricalSnapshotTest()
{
uint32_t ledger = 0;

// Exclude soroban types so we don't have to insert TTLs
auto canonicalEntry =
LedgerTestUtils::generateValidLedgerEntryWithExclusions(
{CONFIG_SETTING, TTL, CONTRACT_CODE, CONTRACT_DATA});
canonicalEntry.lastModifiedLedgerSeq = 0;

do
{
++ledger;
auto entryCopy = canonicalEntry;
entryCopy.lastModifiedLedgerSeq = ledger;
mApp->getLedgerManager().setNextLedgerEntryBatchForBucketTesting(
{}, {entryCopy}, {});
closeLedger(*mApp);
} while (ledger < mApp->getConfig().QUERY_SNAPSHOT_LEDGERS + 2);
++ledger;

auto ledgerView = getApp().getLedgerManager().copyImmutableLedgerView();
auto lk = LedgerEntryKey(canonicalEntry);

auto currentLoadedEntry = ledgerView.loadLiveEntry(lk);
REQUIRE(currentLoadedEntry);

// Note: The definition of "historical snapshot" ledger is that the
// BucketList snapshot for ledger N is the BucketList as it exists at
// the beginning of ledger N. This means that the lastModifiedLedgerSeq
// is at most N - 1.
REQUIRE(currentLoadedEntry->lastModifiedLedgerSeq == ledger - 1);

for (uint32_t currLedger = ledger; currLedger > 0; --currLedger)
{
auto loadRes = ledgerView.loadLiveKeysFromLedger({lk}, currLedger);

// If we query an older snapshot, should return <null, notFound>
if (currLedger < ledger - mApp->getConfig().QUERY_SNAPSHOT_LEDGERS)
{
REQUIRE(!loadRes);
}
else
{
REQUIRE(loadRes);
REQUIRE(loadRes->size() == 1);
REQUIRE(loadRes->at(0).lastModifiedLedgerSeq == currLedger - 1);
}
}
}

virtual void
buildMultiVersionTest(bool sorobanOnly = false)
{
Expand Down Expand Up @@ -1166,17 +1114,6 @@ TEST_CASE("soroban cache population", "[soroban][bucketindex]")
testAllIndexTypes(f);
}

TEST_CASE("load from historical snapshots", "[bucket][bucketindex]")
{
auto f = [&](Config& cfg) {
cfg.QUERY_SNAPSHOT_LEDGERS = 5;
auto test = BucketIndexTest(cfg);
test.runHistoricalSnapshotTest();
};

testAllIndexTypes(f);
}

TEST_CASE("loadPoolShareTrustLinesByAccountAndAsset", "[bucket][bucketindex]")
{
auto f = [&](Config& cfg) {
Expand Down
2 changes: 1 addition & 1 deletion src/bucket/test/BucketTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ LedgerManagerForBucketTests::finalizeLedgerTxnChanges(
auto& bm = mApp.getBucketManager();
auto tempState = ImmutableLedgerData::createAndMaybeLoadConfig(
bm.getLiveBucketList(), bm.getHotArchiveBucketList(), tempLcl,
tempHas, mApp.getMetrics(), nullptr, 0);
tempHas, mApp.getMetrics());
finalSorobanConfig = tempState->getSorobanConfig();
}

Expand Down
Loading