diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 2e448dfa828..db797b76368 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -97,7 +97,7 @@ pub mod write; pub(crate) use take::row_offsets_to_row_addresses; use self::builder::DatasetBuilder; -use self::cleanup::RemovalStats; +use self::cleanup::{CleanupPlan, RemovalStats}; use self::fragment::FileFragment; use self::refs::Refs; use self::scanner::{DatasetRecordBatchStream, Scanner}; @@ -1205,6 +1205,35 @@ impl Dataset { cleanup::cleanup_old_versions(self, policy).boxed() } + /// Plan cleanup without removing any files. + /// + /// The returned plan contains the concrete files that would be removed by + /// [`Self::cleanup_with_policy`] for the same dataset snapshot and policy. + /// Policies with `clean_referenced_branches=true` are not supported by + /// cleanup plans; call [`Self::cleanup_with_policy`] directly for those. + /// Planning resolves the latest dataset version from storage instead of + /// relying on the version cached in this handle. + #[instrument(level = "debug", skip(self))] + pub fn plan_cleanup_with_policy( + &self, + policy: CleanupPolicy, + ) -> BoxFuture<'_, Result> { + cleanup::plan_cleanup(self, policy).boxed() + } + + /// Remove the files listed in a cleanup plan. + /// + /// This validates that the plan targets this dataset and that storage still + /// reports the plan's read version as the latest version before deleting. + /// There is still a narrow window between that check and object deletion; + /// callers must prevent concurrent commits if they require a closed-world + /// cleanup execution. + #[instrument(level = "debug", skip(self))] + pub fn cleanup_with_plan(&self, plan: CleanupPlan) -> BoxFuture<'_, Result> { + info!(target: TRACE_DATASET_EVENTS, event=DATASET_CLEANING_EVENT, uri=&self.uri); + cleanup::cleanup_with_plan(self, plan).boxed() + } + #[allow(clippy::too_many_arguments)] async fn do_commit( base_uri: WriteDestination<'_>, diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index b3ca60cfa0f..88d63dce8ce 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -59,6 +59,7 @@ use lance_table::{ }; use object_store::path::Path; use object_store::{Error as ObjectStoreError, ObjectMeta}; +use serde::{Deserialize, Serialize}; use std::fmt::Debug; use std::{ collections::{HashMap, HashSet}, @@ -88,6 +89,111 @@ pub struct RemovalStats { pub deletion_files_removed: u64, } +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum CleanupFileKind { + Manifest, + Data, + Blob, + Transaction, + Index, + Deletion, + TemporaryManifest, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum CleanupFileReason { + OldManifest { version: u64 }, + VerifiedUnreferenced, + UnverifiedExpired { threshold_days: i64 }, + DeleteUnverifiedOverride, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct CleanupFile { + pub path: String, + pub relative_path: String, + pub kind: CleanupFileKind, + pub reason: CleanupFileReason, + pub size_bytes: u64, + pub last_modified: Option>, +} + +impl CleanupFile { + fn path(&self) -> Result { + Path::parse(self.path.as_str()).map_err(|err| Error::Cleanup { + message: format!( + "cleanup plan contains invalid path {:?}: {}", + self.path, err + ), + }) + } +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)] +pub struct CleanupPlanStats { + pub bytes_to_remove: u64, + pub old_versions: u64, + pub data_files: u64, + pub transaction_files: u64, + pub index_files: u64, + pub deletion_files: u64, + pub manifest_files: u64, +} + +impl CleanupPlanStats { + fn record_file(&mut self, file: &CleanupFile) { + self.bytes_to_remove += file.size_bytes; + match file.kind { + CleanupFileKind::Manifest => { + self.manifest_files += 1; + self.old_versions += 1; + } + CleanupFileKind::Data | CleanupFileKind::Blob => self.data_files += 1, + CleanupFileKind::Transaction => self.transaction_files += 1, + CleanupFileKind::Index => self.index_files += 1, + CleanupFileKind::Deletion => self.deletion_files += 1, + CleanupFileKind::TemporaryManifest => self.manifest_files += 1, + } + } +} + +impl From<&CleanupPlanStats> for RemovalStats { + fn from(stats: &CleanupPlanStats) -> Self { + Self { + bytes_removed: stats.bytes_to_remove, + old_versions: stats.old_versions, + data_files_removed: stats.data_files, + transaction_files_removed: stats.transaction_files, + index_files_removed: stats.index_files, + deletion_files_removed: stats.deletion_files, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct CleanupPlan { + pub dataset_uri: String, + pub base_path: String, + pub read_version: u64, + pub policy: CleanupPolicy, + pub created_at: DateTime, + pub files: Vec, + pub stats: CleanupPlanStats, + pub referenced_branches: Vec, + pub tagged_old_versions: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct CleanupReferencedBranch { + pub name: String, + pub root_version: u64, +} + +struct ValidatedCleanupPlan { + files: Vec<(CleanupFile, Path)>, + stats: CleanupPlanStats, +} + #[derive(Clone, Copy, Debug)] enum RemovedFileType { Data, @@ -124,6 +230,8 @@ struct CleanupInspection { tagged_old_versions: HashSet, /// The earliest timestamp of all retained manifests. earliest_retained_manifest_time: Option>, + /// Highest manifest version observed while listing manifests. + max_listed_version: Option, } /// If a file cannot be verified then it will only be deleted if it is at least @@ -137,17 +245,76 @@ impl<'a> CleanupTask<'a> { Self { dataset, policy } } + // CleanupTask has three execution paths: + // - cleanup_old_versions / cleanup_with_policy: resolve storage latest once, + // plan from that snapshot, and delete without a second version check. + // - cleanup_with_plan: validate dataset/path and storage latest version before + // deleting an externally supplied plan. + // - commit hooks: plan from the supplied dataset snapshot, which may be the + // pre-commit version, and do not resolve storage latest. async fn run(self) -> Result { + let latest_version = self.dataset.latest_version_id().await?; + self.run_at_version(latest_version).await + } + + async fn run_at_version(self, latest_version: u64) -> Result { let mut final_stats = RemovalStats::default(); - // First check if we need to clean referenced branches - // For cases that referenced branches never clean and the current cleanup cannot clean anything - // This must happen before cleaning the current branch if the setting is enabled. - let referenced_branches: Vec<(String, u64)> = self.find_referenced_branches().await?; + let referenced_branches = self.find_referenced_branches().await?; if self.policy.clean_referenced_branches { - self.clean_referenced_branches(&referenced_branches).await?; + let branch_stats = self.clean_referenced_branches(&referenced_branches).await?; + final_stats.bytes_removed += branch_stats.bytes_removed; + final_stats.old_versions += branch_stats.old_versions; + final_stats.data_files_removed += branch_stats.data_files_removed; + final_stats.transaction_files_removed += branch_stats.transaction_files_removed; + final_stats.index_files_removed += branch_stats.index_files_removed; + final_stats.deletion_files_removed += branch_stats.deletion_files_removed; } + let plan = self + .plan_with_referenced_branches_at_version(referenced_branches, latest_version) + .await?; + let stats = self + .execute_plan_unchecked(Self::parse_plan_paths(plan)?) + .await?; + final_stats.bytes_removed += stats.bytes_removed; + final_stats.old_versions += stats.old_versions; + final_stats.data_files_removed += stats.data_files_removed; + final_stats.transaction_files_removed += stats.transaction_files_removed; + final_stats.index_files_removed += stats.index_files_removed; + final_stats.deletion_files_removed += stats.deletion_files_removed; + + Ok(final_stats) + } + + async fn plan(&self) -> Result { + if self.policy.clean_referenced_branches { + return Err(unsupported_referenced_branch_plan_error()); + } + + let referenced_branches: Vec<(String, u64)> = self.find_referenced_branches().await?; + self.plan_with_referenced_branches(referenced_branches) + .await + } + + async fn plan_with_referenced_branches( + &self, + referenced_branches: Vec<(String, u64)>, + ) -> Result { + let latest_version = self.dataset.latest_version_id().await?; + self.plan_with_referenced_branches_at_version(referenced_branches, latest_version) + .await + } + + async fn plan_with_referenced_branches_at_version( + &self, + referenced_branches: Vec<(String, u64)>, + latest_version: u64, + ) -> Result { + // First check if we need to clean referenced branches + // For cases that referenced branches never clean and the current cleanup cannot clean anything + // This must happen before cleaning the current branch if the setting is enabled. + // we process all manifest files in parallel to figure // out which files are referenced by valid manifests @@ -169,7 +336,17 @@ impl<'a> CleanupTask<'a> { .map(|tag_content| tag_content.version) .collect(); - let mut inspection = self.process_manifests(&tagged_versions).await?; + let mut inspection = self + .process_manifests(&tagged_versions, latest_version) + .await?; + let Some(read_version) = inspection.max_listed_version else { + return Err(Error::Cleanup { + message: format!( + "manifest listing did not include latest version {}; no manifests were listed", + latest_version + ), + }); + }; if self.policy.error_if_tagged_old_versions && !inspection.tagged_old_versions.is_empty() { return Err(tagged_old_versions_cleanup_error( @@ -184,30 +361,54 @@ impl<'a> CleanupTask<'a> { .await? }; - let stats = self.delete_unreferenced_files(inspection).await?; - final_stats.bytes_removed += stats.bytes_removed; - final_stats.old_versions += stats.old_versions; - final_stats.data_files_removed += stats.data_files_removed; - final_stats.transaction_files_removed += stats.transaction_files_removed; - final_stats.index_files_removed += stats.index_files_removed; - final_stats.deletion_files_removed += stats.deletion_files_removed; - Ok(final_stats) + let plan = self + .build_cleanup_plan(inspection, referenced_branches, read_version) + .await?; + debug!( + bytes_to_remove = plan.stats.bytes_to_remove, + old_versions = plan.stats.old_versions, + "planned cleanup" + ); + Ok(plan) + } + + async fn execute_plan(&self, plan: CleanupPlan) -> Result { + let read_version = plan.read_version; + let plan = self.validate_plan(plan)?; + self.validate_plan_read_version(read_version).await?; + self.execute_plan_unchecked(plan).await } #[instrument(level = "debug", skip_all)] async fn process_manifests( &'a self, tagged_versions: &HashSet, + latest_version: u64, ) -> Result { let inspection = Mutex::new(CleanupInspection::default()); self.dataset .commit_handler .list_manifest_locations(&self.dataset.base, &self.dataset.object_store, false) .try_for_each_concurrent(self.dataset.object_store.io_parallelism(), |location| { - self.process_manifest_file(location, &inspection, tagged_versions) + self.process_manifest_file(location, &inspection, tagged_versions, latest_version) }) .await?; - Ok(inspection.into_inner().unwrap()) + let inspection = inspection.into_inner().unwrap(); + match inspection.max_listed_version { + Some(max_listed_version) if max_listed_version >= latest_version => Ok(inspection), + Some(max_listed_version) => Err(Error::Cleanup { + message: format!( + "manifest listing did not include latest version {}; highest listed version was {}", + latest_version, max_listed_version + ), + }), + None => Err(Error::Cleanup { + message: format!( + "manifest listing did not include latest version {}; no manifests were listed", + latest_version + ), + }), + } } async fn process_manifest_file( @@ -215,6 +416,7 @@ impl<'a> CleanupTask<'a> { location: ManifestLocation, inspection: &Mutex, tagged_versions: &HashSet, + latest_version: u64, ) -> Result<()> { // TODO: We can't cleanup invalid manifests. There is no way to distinguish // between an invalid manifest and a temporary I/O error. It's also not safe @@ -224,18 +426,23 @@ impl<'a> CleanupTask<'a> { let manifest = read_manifest(&self.dataset.object_store, &location.path, location.size).await?; - let dataset_version = self.dataset.version().version; - // Don't delete the latest version, even if it is old. Don't delete tagged versions, - // regardless of age. Don't delete manifests if their version is newer than the dataset - // version. These are either in-progress or newly added since we started. - let is_latest = dataset_version <= manifest.version; + // regardless of age. Don't delete manifests whose version is newer than the + // `latest_version` snapshot resolved at plan start; those were committed + // concurrently with planning and may still reference data files we are about + // to inspect. + let is_latest = latest_version <= manifest.version; let is_tagged = tagged_versions.contains(&manifest.version); let in_working_set = is_latest || !self.policy.should_clean(&manifest) || is_tagged; let indexes = read_manifest_indexes(&self.dataset.object_store, &location, &manifest).await?; let mut inspection = inspection.lock().unwrap(); + inspection.max_listed_version = Some( + inspection + .max_listed_version + .map_or(manifest.version, |version| version.max(manifest.version)), + ); // Track tagged old versions in case we want to return a `CleanupError` later. // Only track tagged when it is old. @@ -316,11 +523,12 @@ impl<'a> CleanupTask<'a> { deletion_files_removed = tracing::field::Empty ) )] - async fn delete_unreferenced_files( + async fn build_cleanup_plan( &self, inspection: CleanupInspection, - ) -> Result { - let removal_stats = Mutex::new(RemovalStats::default()); + referenced_branches: Vec<(String, u64)>, + latest_version: u64, + ) -> Result { let verification_threshold = utc_now() - TimeDelta::try_days(UNVERIFIED_THRESHOLD_DAYS).expect("TimeDelta::try_days"); @@ -337,7 +545,6 @@ impl<'a> CleanupTask<'a> { // Build stream for a managed subtree let build_listing_stream = |dir: Path, file_type: Option| { let inspection_ref = &inspection; - let removal_stats_ref = &removal_stats; self.dataset .object_store .read_dir_all(&dir, inspection.earliest_retained_manifest_time) @@ -356,26 +563,13 @@ impl<'a> CleanupTask<'a> { // delete it if we can verify it is part of an old version. let maybe_in_progress = !self.policy.delete_unverified && obj_meta.last_modified >= verification_threshold; - let path_to_remove = self.path_if_not_referenced( - obj_meta.location, + let file_to_remove = self.cleanup_file_if_not_referenced( + obj_meta, + file_type, maybe_in_progress, inspection_ref, ); - if matches!(path_to_remove, Ok(Some(..))) { - let mut stats = removal_stats_ref.lock().unwrap(); - stats.bytes_removed += obj_meta.size; - if let Some(file_type) = file_type { - match file_type { - RemovedFileType::Data => stats.data_files_removed += 1, - RemovedFileType::Transaction => { - stats.transaction_files_removed += 1 - } - RemovedFileType::Index => stats.index_files_removed += 1, - RemovedFileType::Deletion => stats.deletion_files_removed += 1, - } - } - } - future::ready(path_to_remove) + future::ready(file_to_remove) }) .boxed() }; @@ -394,30 +588,86 @@ impl<'a> CleanupTask<'a> { Some(RemovedFileType::Deletion), ), ]; - let unreferenced_paths = stream::iter(streams).flatten().boxed(); + let unreferenced_files = stream::iter(streams).flatten().boxed(); let old_manifests = inspection.old_manifests.clone(); - let num_old_manifests = old_manifests.len(); // Ideally this collect shouldn't be needed here but it seems necessary // to avoid https://github.com/rust-lang/rust/issues/102211 - let manifest_bytes_removed = stream::iter(old_manifests.keys()) - .map(|path| self.dataset.object_store.size(path)) + let manifest_files = stream::iter(old_manifests.into_iter()) + .map(|(path, version)| async move { + let size_bytes = self.dataset.object_store.size(&path).await?; + Ok::(CleanupFile { + relative_path: remove_prefix(&path, &self.dataset.base).to_string(), + path: path.to_string(), + kind: CleanupFileKind::Manifest, + reason: CleanupFileReason::OldManifest { version }, + size_bytes, + last_modified: None, + }) + }) .collect::>() .await; - let manifest_bytes_removed = stream::iter(manifest_bytes_removed) + let manifest_files = stream::iter(manifest_files) .buffer_unordered(self.dataset.object_store.io_parallelism()) - .try_fold(0, |acc, size| async move { Ok(acc + (size)) }) - .await; + .try_collect::>() + .await?; - let old_manifests_stream = stream::iter(old_manifests.into_keys()) - .map(|path| { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = path.as_ref()); - Ok(path) - }) - .boxed(); - let all_paths_to_remove = - stream::iter(vec![unreferenced_paths, old_manifests_stream]).flatten(); + let mut files = unreferenced_files.try_collect::>().await?; + files.extend(manifest_files); + + let mut stats = CleanupPlanStats::default(); + for file in &files { + stats.record_file(file); + } + + let mut tagged_old_versions = inspection + .tagged_old_versions + .into_iter() + .collect::>(); + tagged_old_versions.sort_unstable(); + + Ok(CleanupPlan { + dataset_uri: self.dataset.uri.clone(), + base_path: self.dataset.base.to_string(), + read_version: latest_version, + policy: self.policy.clone(), + created_at: utc_now(), + files, + stats, + referenced_branches: referenced_branches + .into_iter() + .map(|(name, root_version)| CleanupReferencedBranch { name, root_version }) + .collect(), + tagged_old_versions, + }) + } + + async fn execute_plan_unchecked(&self, plan: ValidatedCleanupPlan) -> Result { + let all_paths_to_remove = stream::iter(plan.files.into_iter().map(|(file, path)| { + match file.kind { + CleanupFileKind::Manifest => { + info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = file.path.as_str()); + } + CleanupFileKind::Data | CleanupFileKind::Blob => { + let mode = audit_mode(&file.reason); + let path = file.path.as_str(); + info!(target: TRACE_FILE_AUDIT, mode=mode, r#type=AUDIT_TYPE_DATA, path); + } + CleanupFileKind::Deletion => { + let mode = audit_mode(&file.reason); + let path = file.path.as_str(); + info!(target: TRACE_FILE_AUDIT, mode=mode, r#type=AUDIT_TYPE_DELETION, path); + } + CleanupFileKind::Index => { + let mode = audit_mode(&file.reason); + let path = file.path.as_str(); + info!(target: TRACE_FILE_AUDIT, mode=mode, r#type=AUDIT_TYPE_INDEX, path); + } + CleanupFileKind::Transaction | CleanupFileKind::TemporaryManifest => {} + } + Ok(path) + })); let paths_to_delete: BoxStream> = if let Some(rate) = self.policy.delete_rate_limit @@ -441,9 +691,7 @@ impl<'a> CleanupTask<'a> { delete_fut.await?; - let mut removal_stats = removal_stats.into_inner().unwrap(); - removal_stats.old_versions = num_old_manifests as u64; - removal_stats.bytes_removed += manifest_bytes_removed?; + let removal_stats = RemovalStats::from(&plan.stats); let span = Span::current(); span.record("bytes_removed", removal_stats.bytes_removed); @@ -461,13 +709,102 @@ impl<'a> CleanupTask<'a> { Ok(removal_stats) } - fn path_if_not_referenced( + fn parse_plan_paths(plan: CleanupPlan) -> Result { + let mut files = Vec::with_capacity(plan.files.len()); + for file in plan.files { + let path = file.path()?; + files.push((file, path)); + } + + Ok(ValidatedCleanupPlan { + files, + stats: plan.stats, + }) + } + + fn validate_plan(&self, plan: CleanupPlan) -> Result { + if plan.policy.clean_referenced_branches { + return Err(unsupported_referenced_branch_plan_error()); + } + + if plan.dataset_uri != self.dataset.uri { + return Err(Error::Cleanup { + message: format!( + "cleanup plan was created for dataset {:?}, but execution was requested for dataset {:?}", + plan.dataset_uri, self.dataset.uri + ), + }); + } + + let plan_base_path = + Path::parse(plan.base_path.as_str()).map_err(|err| Error::Cleanup { + message: format!( + "cleanup plan contains invalid base path {:?}: {}", + plan.base_path, err + ), + })?; + if plan_base_path != self.dataset.base { + return Err(Error::Cleanup { + message: format!( + "cleanup plan base path {:?} does not match dataset base path {:?}", + plan.base_path, self.dataset.base + ), + }); + } + + let mut files = Vec::with_capacity(plan.files.len()); + for file in plan.files { + let path = file.path()?; + if !path.prefix_matches(&self.dataset.base) { + return Err(Error::Cleanup { + message: format!( + "cleanup plan path {:?} is outside dataset base path {:?}", + file.path, self.dataset.base + ), + }); + } + files.push((file, path)); + } + + Ok(ValidatedCleanupPlan { + files, + stats: plan.stats, + }) + } + + async fn validate_plan_read_version(&self, read_version: u64) -> Result<()> { + let latest_version = self.dataset.latest_version_id().await?; + if read_version != latest_version { + return Err(Error::Cleanup { + message: format!( + "cleanup plan was created from version {}, but latest dataset version is {}", + read_version, latest_version + ), + }); + } + + Ok(()) + } + + fn cleanup_file_if_not_referenced( &self, - path: Path, + obj_meta: ObjectMeta, + file_type: Option, maybe_in_progress: bool, inspection: &CleanupInspection, - ) -> Result> { + ) -> Result> { + let path = obj_meta.location; let relative_path = remove_prefix(&path, &self.dataset.base); + let file = |kind, reason| { + Ok(Some(CleanupFile { + path: path.to_string(), + relative_path: relative_path.to_string(), + kind, + reason, + size_bytes: obj_meta.size, + last_modified: Some(obj_meta.last_modified), + })) + }; if relative_path.as_ref().starts_with("_versions/.tmp") { // This is a temporary manifest file. // @@ -476,7 +813,10 @@ impl<'a> CleanupTask<'a> { if maybe_in_progress { return Ok(None); } else { - return Ok(Some(path)); + return file( + CleanupFileKind::TemporaryManifest, + self.unverified_cleanup_reason(), + ); } } if relative_path.as_ref().starts_with("_indices") { @@ -490,15 +830,16 @@ impl<'a> CleanupTask<'a> { { return Ok(None); } else if !maybe_in_progress { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_INDEX, path = path.to_string()); - return Ok(Some(path)); + return file(CleanupFileKind::Index, self.unverified_cleanup_reason()); } else if inspection .verified_files .index_uuids .contains(uuid.as_ref()) { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_INDEX, path = path.to_string()); - return Ok(Some(path)); + return file( + CleanupFileKind::Index, + CleanupFileReason::VerifiedUnreferenced, + ); } } else { return Ok(None); @@ -514,15 +855,19 @@ impl<'a> CleanupTask<'a> { { Ok(None) } else if !maybe_in_progress { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_DATA, path = path.to_string()); - Ok(Some(path)) + file( + file_kind(file_type, CleanupFileKind::Data), + self.unverified_cleanup_reason(), + ) } else if inspection .verified_files .data_paths .contains(&relative_path) { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_DATA, path = path.to_string()); - Ok(Some(path)) + file( + file_kind(file_type, CleanupFileKind::Data), + CleanupFileReason::VerifiedUnreferenced, + ) } else { Ok(None) } @@ -587,15 +932,16 @@ impl<'a> CleanupTask<'a> { { Ok(None) } else if !maybe_in_progress { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_DATA, path = path.to_string()); - Ok(Some(path)) + file(CleanupFileKind::Blob, self.unverified_cleanup_reason()) } else if inspection .verified_files .data_paths .contains(&parent_data_path) { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_DATA, path = path.to_string()); - Ok(Some(path)) + file( + CleanupFileKind::Blob, + CleanupFileReason::VerifiedUnreferenced, + ) } else { Ok(None) } @@ -613,15 +959,16 @@ impl<'a> CleanupTask<'a> { { Ok(None) } else if !maybe_in_progress { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_DELETION, path = path.to_string()); - Ok(Some(path)) + file(CleanupFileKind::Deletion, self.unverified_cleanup_reason()) } else if inspection .verified_files .delete_paths .contains(&relative_path) { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_DELETION, path = path.to_string()); - Ok(Some(path)) + file( + CleanupFileKind::Deletion, + CleanupFileReason::VerifiedUnreferenced, + ) } else { Ok(None) } @@ -640,7 +987,13 @@ impl<'a> CleanupTask<'a> { } else if !maybe_in_progress || inspection.verified_files.tx_paths.contains(&relative_path) { - Ok(Some(path)) + let reason = if inspection.verified_files.tx_paths.contains(&relative_path) + { + CleanupFileReason::VerifiedUnreferenced + } else { + self.unverified_cleanup_reason() + }; + file(CleanupFileKind::Transaction, reason) } else { Ok(None) } @@ -652,6 +1005,16 @@ impl<'a> CleanupTask<'a> { } } + fn unverified_cleanup_reason(&self) -> CleanupFileReason { + if self.policy.delete_unverified { + CleanupFileReason::DeleteUnverifiedOverride + } else { + CleanupFileReason::UnverifiedExpired { + threshold_days: UNVERIFIED_THRESHOLD_DAYS, + } + } + } + async fn find_referenced_branches(&self) -> Result> { let current_branch_id = self.dataset.branch_identifier().await?; let all_branches = self.dataset.branches().list().await?; @@ -887,7 +1250,27 @@ fn calculate_duration(scheme: String, rate: u64) -> Duration { Duration::from_nanos(duration_ns) } -#[derive(Clone, Debug)] +fn audit_mode(reason: &CleanupFileReason) -> &'static str { + match reason { + CleanupFileReason::VerifiedUnreferenced | CleanupFileReason::OldManifest { .. } => { + AUDIT_MODE_DELETE + } + CleanupFileReason::UnverifiedExpired { .. } + | CleanupFileReason::DeleteUnverifiedOverride => AUDIT_MODE_DELETE_UNVERIFIED, + } +} + +fn file_kind(file_type: Option, default_kind: CleanupFileKind) -> CleanupFileKind { + match file_type { + Some(RemovedFileType::Data) => CleanupFileKind::Data, + Some(RemovedFileType::Transaction) => CleanupFileKind::Transaction, + Some(RemovedFileType::Index) => CleanupFileKind::Index, + Some(RemovedFileType::Deletion) => CleanupFileKind::Deletion, + None => default_kind, + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct CleanupPolicy { /// If not none, cleanup all versions before the specified timestamp. pub before_timestamp: Option>, @@ -1024,26 +1407,61 @@ pub async fn cleanup_old_versions( cleanup.run().await } +/// Plan a cleanup without deleting any files. +/// +/// The returned plan contains the concrete files that would be deleted by +/// [`cleanup_old_versions`] for the same dataset snapshot and policy. +/// Policies with `clean_referenced_branches=true` are not supported by cleanup +/// plans; call [`cleanup_old_versions`] directly for those. +/// +/// Planning resolves the latest dataset version from storage instead of using +/// the version cached in `dataset`. This issues a store request before scanning +/// manifests and allows plans to be created from stale dataset handles. +pub async fn plan_cleanup(dataset: &Dataset, policy: CleanupPolicy) -> Result { + let cleanup = CleanupTask::new(dataset, policy); + cleanup.plan().await +} + +/// Delete the files listed in a cleanup plan. +/// +/// Before deleting, this verifies the plan was created for the same dataset and +/// base path, that every planned path stays under the dataset base, and that the +/// latest dataset version in storage still matches `plan.read_version`. This +/// latest-version check issues a store request. +/// +/// There remains a narrow TOCTOU window between the latest-version check and +/// the object deletions. Callers that cannot tolerate concurrent commits during +/// cleanup must coordinate externally, for example by running cleanup in a +/// maintenance window or under a dataset-level write exclusion mechanism. +pub async fn cleanup_with_plan(dataset: &Dataset, plan: CleanupPlan) -> Result { + let cleanup = CleanupTask::new(dataset, plan.policy.clone()); + cleanup.execute_plan(plan).await +} + /// If the dataset config has `lance.auto_cleanup` parameters set, -/// this function automatically calls `dataset.cleanup_old_versions` -/// every `lance.auto_cleanup.interval` versions. This function calls -/// `dataset.cleanup_old_versions` with `lance.auto_cleanup.older_than` -/// for `older_than` and `Some(false)` for both `delete_unverified` and -/// `error_if_tagged_old_versions`. +/// this function automatically cleans old versions every +/// `lance.auto_cleanup.interval` versions. +/// +/// This uses `dataset.version().version` as the planning snapshot. Commit hooks +/// pass the pre-commit dataset handle here, so this retains the pre-commit +/// latest version instead of resolving the newly committed storage latest. pub async fn auto_cleanup_hook( dataset: &Dataset, manifest: &Manifest, ) -> Result> { let policy = build_cleanup_policy(dataset, manifest).await?; if let Some(policy) = policy { - Ok(Some(dataset.cleanup_with_policy(policy).await?)) + let cleanup = CleanupTask::new(dataset, policy); + Ok(Some( + cleanup.run_at_version(dataset.version().version).await?, + )) } else { Ok(None) } } -/// This is trigger when a parent branch is cleaning and `clean_referenced_branches` is set as true -/// For cascade branches, some cleanup parameters need be overridden. +/// Clean a referenced branch using that branch dataset's current version as the +/// planning snapshot. pub async fn cleanup_cascade_branch( dataset: &Dataset, manifest: &Manifest, @@ -1052,7 +1470,10 @@ pub async fn cleanup_cascade_branch( if let Some(mut policy) = policy { policy.clean_referenced_branches = false; policy.error_if_tagged_old_versions = false; - Ok(Some(dataset.cleanup_with_policy(policy).await?)) + let cleanup = CleanupTask::new(dataset, policy); + Ok(Some( + cleanup.run_at_version(dataset.version().version).await?, + )) } else { Ok(None) } @@ -1172,6 +1593,13 @@ fn tagged_old_versions_cleanup_error( } } +fn unsupported_referenced_branch_plan_error() -> Error { + Error::Cleanup { + message: "cleanup plans do not support clean_referenced_branches=true; call cleanup_old_versions or cleanup_with_policy directly" + .to_string(), + } +} + #[cfg(test)] mod tests { use std::{ @@ -1443,6 +1871,16 @@ mod tests { cleanup_old_versions(&db, policy).await } + async fn plan_cleanup_with_policy(&self, policy: CleanupPolicy) -> Result { + let db = self.open().await?; + plan_cleanup(&db, policy).await + } + + async fn cleanup_with_plan(&self, plan: CleanupPlan) -> Result { + let db = self.open().await?; + super::cleanup_with_plan(&db, plan).await + } + async fn run_cleanup_with_override( &self, before: DateTime, @@ -1629,6 +2067,24 @@ mod tests { )) } + fn cleanup_plan_for_files(dataset: &Dataset, files: Vec) -> CleanupPlan { + let mut stats = CleanupPlanStats::default(); + for file in &files { + stats.record_file(file); + } + CleanupPlan { + dataset_uri: dataset.uri.clone(), + base_path: dataset.base.to_string(), + read_version: dataset.version().version, + policy: CleanupPolicy::default(), + created_at: utc_now(), + files, + stats, + referenced_branches: Vec::new(), + tagged_old_versions: Vec::new(), + } + } + #[tokio::test] async fn cleanup_unreferenced_data_files() { // We should clean up data files that are only referenced @@ -1670,6 +2126,258 @@ mod tests { assert_gt!(after_count.num_tx_files, 0); } + #[tokio::test] + async fn plan_cleanup_does_not_delete_files() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + MockClock::set_system_time(TimeDelta::try_seconds(1).unwrap().to_std().unwrap()); + fixture.overwrite_some_data().await.unwrap(); + + let before_count = fixture.count_files().await.unwrap(); + let policy = CleanupPolicyBuilder::default() + .before_timestamp(utc_now()) + .build(); + + let plan = fixture.plan_cleanup_with_policy(policy).await.unwrap(); + let after_plan_count = fixture.count_files().await.unwrap(); + + assert_eq!(before_count, after_plan_count); + assert_eq!(plan.read_version, 2); + assert_eq!(plan.stats.old_versions, 1); + assert_eq!(plan.stats.data_files, 1); + assert_eq!(plan.stats.transaction_files, 1); + assert_eq!(plan.stats.manifest_files, 1); + assert!( + plan.files + .iter() + .any(|file| matches!(file.reason, CleanupFileReason::OldManifest { version: 1 })) + ); + assert!( + plan.files + .iter() + .any(|file| file.kind == CleanupFileKind::Data + && file.reason == CleanupFileReason::VerifiedUnreferenced) + ); + + let removed = fixture.cleanup_with_plan(plan).await.unwrap(); + let after_cleanup_count = fixture.count_files().await.unwrap(); + + assert_eq!( + removed.bytes_removed, + before_count.num_bytes - after_cleanup_count.num_bytes + ); + assert_eq!(removed.old_versions, 1); + assert_eq!(removed.data_files_removed, 1); + } + + #[tokio::test] + async fn plan_cleanup_rejects_referenced_branch_cleanup() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + let dataset = fixture.open().await.unwrap(); + let policy = CleanupPolicyBuilder::default() + .clean_referenced_branches(true) + .build(); + + let err = plan_cleanup(&dataset, policy).await.unwrap_err(); + assert_contains!( + err.to_string(), + "cleanup plans do not support clean_referenced_branches=true" + ); + } + + #[tokio::test] + async fn cleanup_with_plan_parses_encoded_paths() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + let dataset = fixture.open().await.unwrap(); + + let path = dataset.versions_dir().join(".tmp cleanup % 中文.manifest"); + dataset + .object_store + .as_ref() + .put(&path, b"tmp") + .await + .unwrap(); + assert!(dataset.object_store.as_ref().exists(&path).await.unwrap()); + + let file = CleanupFile { + path: path.to_string(), + relative_path: remove_prefix(&path, &dataset.base).to_string(), + kind: CleanupFileKind::TemporaryManifest, + reason: CleanupFileReason::DeleteUnverifiedOverride, + size_bytes: 3, + last_modified: Some(utc_now()), + }; + let plan = cleanup_plan_for_files(&dataset, vec![file]); + + let stats = super::cleanup_with_plan(&dataset, plan).await.unwrap(); + + assert_eq!(stats.bytes_removed, 3); + assert!(!dataset.object_store.as_ref().exists(&path).await.unwrap()); + } + + #[tokio::test] + async fn cleanup_with_plan_rejects_stale_version() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + let dataset = fixture.open().await.unwrap(); + let mut plan = cleanup_plan_for_files(&dataset, Vec::new()); + plan.read_version -= 1; + + let err = super::cleanup_with_plan(&dataset, plan).await.unwrap_err(); + assert_contains!( + err.to_string(), + "cleanup plan was created from version 0, but latest dataset version is 1" + ); + } + + #[tokio::test] + async fn cleanup_with_plan_rejects_referenced_branch_cleanup() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + let dataset = fixture.open().await.unwrap(); + let mut plan = cleanup_plan_for_files(&dataset, Vec::new()); + plan.policy.clean_referenced_branches = true; + + let err = super::cleanup_with_plan(&dataset, plan).await.unwrap_err(); + assert_contains!( + err.to_string(), + "cleanup plans do not support clean_referenced_branches=true" + ); + } + + #[tokio::test] + async fn cleanup_with_plan_rejects_toctou_commit_with_stale_handle() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + MockClock::set_system_time(TimeDelta::try_seconds(1).unwrap().to_std().unwrap()); + fixture.overwrite_some_data().await.unwrap(); + + let dataset = fixture.open().await.unwrap(); + let policy = CleanupPolicyBuilder::default() + .before_timestamp(utc_now()) + .build(); + let plan = plan_cleanup(&dataset, policy).await.unwrap(); + assert_eq!(plan.read_version, 2); + + fixture.append_some_data().await.unwrap(); + assert_eq!(dataset.version().version, 2); + + let err = super::cleanup_with_plan(&dataset, plan).await.unwrap_err(); + assert_contains!( + err.to_string(), + "cleanup plan was created from version 2, but latest dataset version is 3" + ); + } + + #[tokio::test] + async fn internal_cleanup_plan_allows_toctou_commit_before_delete() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + MockClock::set_system_time(TimeDelta::try_seconds(1).unwrap().to_std().unwrap()); + fixture.overwrite_some_data().await.unwrap(); + + let dataset = fixture.open().await.unwrap(); + let policy = CleanupPolicyBuilder::default() + .before_timestamp(utc_now()) + .build(); + let plan = plan_cleanup(&dataset, policy).await.unwrap(); + assert_eq!(plan.read_version, 2); + + fixture.append_some_data().await.unwrap(); + + let cleanup = CleanupTask::new(&dataset, plan.policy.clone()); + let plan = CleanupTask::parse_plan_paths(plan).unwrap(); + let removed = cleanup.execute_plan_unchecked(plan).await.unwrap(); + + assert_eq!(removed.old_versions, 1); + assert_eq!(removed.data_files_removed, 1); + } + + #[tokio::test] + async fn plan_cleanup_uses_latest_version_with_stale_handle() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + MockClock::set_system_time(TimeDelta::try_seconds(1).unwrap().to_std().unwrap()); + fixture.overwrite_some_data().await.unwrap(); + + let dataset = fixture.open().await.unwrap(); + assert_eq!(dataset.version().version, 2); + + fixture.append_some_data().await.unwrap(); + assert_eq!(dataset.version().version, 2); + + let policy = CleanupPolicyBuilder::default() + .before_timestamp(utc_now()) + .build(); + let plan = plan_cleanup(&dataset, policy).await.unwrap(); + + assert_eq!(plan.read_version, 3); + fixture.cleanup_with_plan(plan).await.unwrap(); + } + + #[tokio::test] + async fn plan_cleanup_records_manifest_version_seen_during_planning() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + MockClock::set_system_time(TimeDelta::try_seconds(1).unwrap().to_std().unwrap()); + fixture.overwrite_some_data().await.unwrap(); + + let dataset = fixture.open().await.unwrap(); + assert_eq!(dataset.version().version, 2); + + fixture.append_some_data().await.unwrap(); + + let policy = CleanupPolicyBuilder::default() + .before_timestamp(utc_now()) + .build(); + let cleanup = CleanupTask::new(&dataset, policy); + let plan = cleanup + .plan_with_referenced_branches_at_version(Vec::new(), 2) + .await + .unwrap(); + + assert_eq!(plan.read_version, 3); + fixture.cleanup_with_plan(plan).await.unwrap(); + } + + #[tokio::test] + async fn process_manifests_rejects_listing_missing_latest_version() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + fixture.overwrite_some_data().await.unwrap(); + let dataset = fixture.open().await.unwrap(); + + let cleanup = CleanupTask::new(&dataset, CleanupPolicy::default()); + let err = cleanup + .process_manifests(&HashSet::new(), dataset.version().version + 1) + .await + .unwrap_err(); + + assert_contains!( + err.to_string(), + "manifest listing did not include latest version 3; highest listed version was 2" + ); + } + + #[tokio::test] + async fn cleanup_with_plan_rejects_wrong_dataset() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + let dataset = fixture.open().await.unwrap(); + let plan = cleanup_plan_for_files(&dataset, Vec::new()); + + let other_fixture = MockDatasetFixture::try_new().unwrap(); + other_fixture.create_some_data().await.unwrap(); + let other_dataset = other_fixture.open().await.unwrap(); + + let err = super::cleanup_with_plan(&other_dataset, plan) + .await + .unwrap_err(); + assert_contains!(err.to_string(), "cleanup plan was created for dataset"); + } + #[tokio::test] async fn cleanup_blob_v2_sidecar_files() { let fixture = MockDatasetFixture::try_new().unwrap();