perf(exec): cache datasets opened from serialized protos in resolve_dataset#7145
Draft
jmhsieh wants to merge 2 commits into
Draft
Conversation
Contributor
|
ACTION NEEDED The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
…ataset A distributed FilteredReadExec / ANN plan is serialized to proto and run on remote workers. The serialized proto cannot carry a live Arc<Dataset>, so resolve_dataset() falls through to a full cold open (ObjectStore init + manifest GET + Dataset construction) on every plan. With no caching at this layer, every distributed plan re-opens the dataset from scratch — on a many-fragment table that is thousands of redundant opens (one production distributed backfill cold-opened a ~4,800-fragment table ~4,800 times, producing a multi-hour planning wall in front of minutes of real work). Add a process-global, bounded moka cache in resolve_dataset keyed by the immutable snapshot identity (uri, version, manifest_etag). Concurrent first-misses coalesce into a single open via moka's single-flight try_get_with, so a wave of N plans collapses to one open per (uri, version, etag) per worker process. Both the filtered-read and ANN paths go through resolve_dataset, so one change covers both. Reusing a cached Arc<Dataset> for a pinned version is always safe: distinct versions/manifests get distinct keys, and a new write produces a new version (and etag) rather than mutating a cached entry, so the cache never serves stale data, breaks version pinning, or affects time-travel reads. Storage options are intentionally excluded from the key (credentials may be re-vended per plan, but the bytes identified by the etag are the same); a cached entry reuses the ObjectStore from first open, and time_to_idle bounds how long those credentials are retained. Tunable via LANCE_PROTO_DATASET_CACHE_SIZE (default 64; 0 disables and restores cold-open-per-plan). An info! log at the open site makes heavy distributed plans greppable without lance::events tracing. Tests: cache hit returns the same Arc; concurrent first-misses single-flight to one entry; distinct versions never alias (version pinning preserved); the Some-arm passthrough is unchanged; and a deterministic non-cloud repro (test_resolve_dataset_opens_once_across_plan_runs) instruments the cache-miss opener to prove 50 plan_runs open the dataset once with the cache (and once under concurrency) vs 50 times without it — one open == one manifest load. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
8f5415c to
8a1d3df
Compare
Drop the internal ticket reference and deployment-specific numbers from the repro test's doc comment; keep the mechanism description generic. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
A distributed
FilteredReadExec/ ANN plan is serialized to proto and executed on remote workers. The serialized proto cannot carry a liveArc<Dataset>, so when a worker deserializes it,resolve_dataset()is called withdataset = Noneand falls through toopen_dataset_from_table_identifier()→DatasetBuilder::from_uri(..).load()— a full cold open (ObjectStoreinit + manifest GET +Datasetconstruction), emitting onedataset_events event="loading"per plan.There is no caching at this layer, so every distributed plan re-opens the dataset from scratch. On a many-fragment table that is thousands of redundant opens. In one production distributed backfill resume, a ~4,800-fragment table was cold-opened ~4,800 times (4,813
loadingevents), producing a ~3-hour planning wall in front of ~5 minutes of actual work — planning was ~36× longer than the work it gated, entirely from re-opens that carry no new information (the version is pinned and immutable).Both the filtered-read path (
io/exec/filtered_read_proto.rs) and the ANN paths (io/exec/ann_proto.rs) route through the sameresolve_dataset, so both are affected.Fix
Add a process-global, bounded
mokacache inresolve_dataset, keyed by the immutable snapshot identity(uri, version, manifest_etag). Concurrent first-misses are coalesced into a single open via moka's single-flighttry_get_with, so a wave of N plans collapses to one open per(uri, version, etag)per worker process.A single change covers both the filtered-read and ANN paths since they share
resolve_dataset.Why this is safe
A pinned dataset version is an immutable snapshot, so reusing a cached
Arc<Dataset>is always correct:Vonly ever sees the dataset opened atV.ObjectStoreconfigured at first open.time_to_idle(5 min) bounds how long those first-open credentials are retained before an idle entry is dropped and re-opened.Configuration
LANCE_PROTO_DATASET_CACHE_SIZE— max distinct(uri, version, etag)datasets cached per process (default64). Set to0to disable caching entirely and restore the previous cold-open-per-plan behavior.info!log is emitted at the open (cache-miss) site so a heavy distributed plan is greppable without enablinglance::eventstracing.Tests
test_resolve_dataset_cached_returns_same_arc— a cache hit returns the sameArc, not a re-open.test_resolve_dataset_cached_single_flight— 16 concurrent first-misses collapse to exactly one cached entry.test_resolve_dataset_cached_distinct_versions— different versions never alias (version pinning preserved).test_resolve_dataset_some_returns_passed_arc— theSomearm still passes the supplied handle through unchanged.test_resolve_dataset_opens_once_across_plan_runs— deterministic, non-cloud repro: instruments the cache-miss opener (one opener call == oneDatasetBuilder::load()== one manifest read) and asserts 50 sequential plans open the dataset once with the cache (and once under concurrency) vs 50 times without it.Run with:
cargo test -p lance --lib --features substrait io::exec::table_identifier