diff --git a/lib/saluki-components/src/common/datadog/io.rs b/lib/saluki-components/src/common/datadog/io.rs index 61eaf11b19..5ecc3ca209 100644 --- a/lib/saluki-components/src/common/datadog/io.rs +++ b/lib/saluki-components/src/common/datadog/io.rs @@ -20,7 +20,7 @@ use saluki_io::net::{ client::http::{into_client_body, HttpClient, HttpClientBuilder}, util::{ middleware::{RetryCircuitBreakerError, RetryCircuitBreakerLayer}, - retry::{DiskUsageRetrieverImpl, PushResult, RetryQueue, Retryable}, + retry::{DiskUsageRetrieverImpl, PersistedQueueArgs, PushResult, RetryQueue, Retryable}, }, }; use saluki_metrics::MetricsBuilder; @@ -393,14 +393,15 @@ async fn run_endpoint_io_loop( // If the storage size is set, enable disk persistence for the retry queue. if config.retry().storage_max_size_bytes() > 0 { retry_queue = retry_queue - .with_disk_persistence( - PathBuf::from(config.retry().storage_path()), - config.retry().storage_max_size_bytes(), - config.retry().storage_max_disk_ratio(), - Arc::new(DiskUsageRetrieverImpl::new(PathBuf::from( + .with_disk_persistence(PersistedQueueArgs { + root_path: PathBuf::from(config.retry().storage_path()), + max_on_disk_bytes: config.retry().storage_max_size_bytes(), + storage_max_disk_ratio: config.retry().storage_max_disk_ratio(), + disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(PathBuf::from( config.retry().storage_path(), ))), - ) + max_age_days: config.retry().outdated_file_in_days(), + }) .await .unwrap_or_else(|e| { error!(endpoint_url, error = %e, "Failed to initialize disk persistence for retry queue. Transactions will not be persisted."); diff --git a/lib/saluki-components/src/common/datadog/retry.rs b/lib/saluki-components/src/common/datadog/retry.rs index fbbd9adcc5..abfe073faf 100644 --- a/lib/saluki-components/src/common/datadog/retry.rs +++ b/lib/saluki-components/src/common/datadog/retry.rs @@ -44,6 +44,10 @@ const fn default_storage_max_disk_ratio() -> f64 { 0.8 } +const fn default_outdated_file_in_days() -> u32 { + 10 +} + /// Datadog Agent-specific forwarder retry configuration. #[derive(Clone, Deserialize, Facet)] #[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))] @@ -130,6 +134,20 @@ pub struct RetryConfiguration { rename = "forwarder_storage_max_disk_ratio" )] storage_max_disk_ratio: f64, + + /// Maximum age in days for retry files on disk before they are deleted at startup. + /// + /// When disk persistence is enabled, ADP removes any `retry-*.json` files in the + /// per-queue subdirectory of `forwarder_storage_path` that are older than this many days + /// each time it starts. This prevents unbounded disk growth from stale retry data left + /// behind after long outages. + /// + /// Defaults to 10. + #[serde( + default = "default_outdated_file_in_days", + rename = "forwarder_outdated_file_in_days" + )] + outdated_file_in_days: u32, } impl RetryConfiguration { @@ -180,6 +198,11 @@ impl RetryConfiguration { self.storage_max_disk_ratio } + /// Returns the maximum age in days for retry files on disk before they are deleted at startup. + pub const fn outdated_file_in_days(&self) -> u32 { + self.outdated_file_in_days + } + /// Creates a new [`DefaultHttpRetryPolicy`] based on the forwarder configuration. /// /// If a [`GenericConfiguration`] is supplied, the policy captures it and checks whether diff --git a/lib/saluki-components/src/config_registry/datadog/forwarder.rs b/lib/saluki-components/src/config_registry/datadog/forwarder.rs index 6d7fe99767..e34379a072 100644 --- a/lib/saluki-components/src/config_registry/datadog/forwarder.rs +++ b/lib/saluki-components/src/config_registry/datadog/forwarder.rs @@ -321,4 +321,16 @@ crate::declare_annotations! { test_json: None, pipeline_affinity: PipelineAffinity::CrossCutting, }; + + /// `forwarder_outdated_file_in_days`—maximum age in days for retry files before deletion at startup. + FORWARDER_OUTDATED_FILE_IN_DAYS = SalukiAnnotation { + schema: &schema::FORWARDER_OUTDATED_FILE_IN_DAYS, + support_level: SupportLevel::Full, + additional_yaml_paths: &[], + env_var_override: None, + used_by: &[structs::FORWARDER_CONFIGURATION], + value_type_override: Some(ValueType::Integer), + test_json: None, + pipeline_affinity: PipelineAffinity::CrossCutting, + }; } diff --git a/lib/saluki-components/src/config_registry/datadog/unsupported.rs b/lib/saluki-components/src/config_registry/datadog/unsupported.rs index 256f3c7108..54c9cc9730 100644 --- a/lib/saluki-components/src/config_registry/datadog/unsupported.rs +++ b/lib/saluki-components/src/config_registry/datadog/unsupported.rs @@ -149,20 +149,6 @@ crate::declare_annotations! { pipeline_affinity: PipelineAffinity::CrossCutting, }; - /// `forwarder_outdated_file_in_days` - retry file retention period. - FORWARDER_OUTDATED_FILE_IN_DAYS = SalukiAnnotation { - schema: &schema::FORWARDER_OUTDATED_FILE_IN_DAYS, - // Retry file retention not implemented. #1360 - support_level: SupportLevel::Incompatible(Severity::Medium), - additional_yaml_paths: &[], - env_var_override: None, - used_by: &[], - value_type_override: None, - test_json: None, - // The forwarder is potentially used by any pipeline. - pipeline_affinity: PipelineAffinity::CrossCutting, - }; - /// `forwarder_retry_queue_capacity_time_interval_sec` - retry queue time-based capacity. FORWARDER_RETRY_QUEUE_CAPACITY_TIME_INTERVAL_SEC = SalukiAnnotation { schema: &schema::FORWARDER_RETRY_QUEUE_CAPACITY_TIME_INTERVAL_SEC, diff --git a/lib/saluki-io/src/net/util/retry/mod.rs b/lib/saluki-io/src/net/util/retry/mod.rs index d8bf1b6a92..cfa226bdf0 100644 --- a/lib/saluki-io/src/net/util/retry/mod.rs +++ b/lib/saluki-io/src/net/util/retry/mod.rs @@ -11,7 +11,9 @@ mod policy; pub use self::policy::{NoopRetryPolicy, RollingExponentialBackoffRetryPolicy}; mod queue; -pub use self::queue::{DiskUsageRetrieverImpl, EventContainer, PushResult, RetryQueue, Retryable}; +pub use self::queue::{ + DiskUsageRetriever, DiskUsageRetrieverImpl, EventContainer, PersistedQueueArgs, PushResult, RetryQueue, Retryable, +}; /// A batteries-included retry policy suitable for HTTP-based clients. pub type DefaultHttpRetryPolicy = diff --git a/lib/saluki-io/src/net/util/retry/queue/mod.rs b/lib/saluki-io/src/net/util/retry/queue/mod.rs index 9226453f6b..a25a8fc758 100644 --- a/lib/saluki-io/src/net/util/retry/queue/mod.rs +++ b/lib/saluki-io/src/net/util/retry/queue/mod.rs @@ -1,12 +1,12 @@ -use std::{collections::VecDeque, path::PathBuf, sync::Arc}; +use std::collections::VecDeque; use saluki_error::{generic_error, GenericError}; use serde::{de::DeserializeOwned, Serialize}; -use tracing::debug; +use tracing::{debug, info, warn}; mod persisted; -pub use self::persisted::DiskUsageRetrieverImpl; -use self::persisted::{DiskUsageRetriever, DiskUsageRetrieverWrapper, PersistedQueue}; +use self::persisted::PersistedQueue; +pub use self::persisted::{DiskUsageRetriever, DiskUsageRetrieverImpl, PersistedQueueArgs}; /// A container that holds events. /// @@ -117,29 +117,27 @@ where /// provides priority to the most recent entries added to the queue, but allows for bursting over the configured /// in-memory size limit without having to immediately discard entries. /// - /// Files are stored in a subdirectory, with the same name as the given queue name, within the given `root_path`. + /// Files are stored in a subdirectory, with the same name as the given queue name, within `args.root_path`. /// /// # Errors /// /// If there is an error initializing the disk persistence layer, an error is returned. - pub async fn with_disk_persistence( - mut self, root_path: PathBuf, max_disk_size_bytes: u64, storage_max_disk_ratio: f64, - disk_usage_retriever: Arc, - ) -> Result { + pub async fn with_disk_persistence(mut self, mut args: PersistedQueueArgs) -> Result { // Make sure the root storage path is non-empty, as otherwise we can't generate a valid path // for the persisted entries in this retry queue. - if root_path.as_os_str().is_empty() { + if args.root_path.as_os_str().is_empty() { return Err(generic_error!("Storage path cannot be empty.")); } - let queue_root_path = root_path.join(&self.queue_name); - let persisted_pending = PersistedQueue::from_root_path( - queue_root_path, - max_disk_size_bytes, - storage_max_disk_ratio, - DiskUsageRetrieverWrapper::new(disk_usage_retriever), - ) - .await?; + args.root_path = args.root_path.join(&self.queue_name); + let mut persisted_pending = PersistedQueue::from_root_path(args).await?; + match persisted_pending.remove_stale_files().await { + Ok(removed) if removed > 0 => { + info!(count = removed, "Removed outdated retry files from disk."); + } + Ok(_) => {} + Err(e) => warn!(error = %e, "Failed to remove stale retry files."), + } self.persisted_pending = Some(persisted_pending); Ok(self) } @@ -281,7 +279,7 @@ where #[cfg(test)] mod tests { - use std::path::Path; + use std::{path::Path, sync::Arc}; use rand::RngExt as _; use rand_distr::Alphanumeric; @@ -429,12 +427,13 @@ mod tests { assert_eq!(0, file_count_recursive(&root_path)); let mut retry_queue = RetryQueue::::new("test".to_string(), u64::MAX) - .with_disk_persistence( - root_path.clone(), - u64::MAX, - 1.0, - Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())), - ) + .with_disk_persistence(PersistedQueueArgs { + root_path: root_path.clone(), + max_on_disk_bytes: u64::MAX, + storage_max_disk_ratio: 1.0, + disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())), + max_age_days: 10, + }) .await .expect("should not fail to create retry queue with disk persistence"); diff --git a/lib/saluki-io/src/net/util/retry/queue/persisted.rs b/lib/saluki-io/src/net/util/retry/queue/persisted.rs index bc048082d0..59a36ae41c 100644 --- a/lib/saluki-io/src/net/util/retry/queue/persisted.rs +++ b/lib/saluki-io/src/net/util/retry/queue/persisted.rs @@ -85,6 +85,23 @@ impl DiskUsageRetrieverWrapper { } } +/// Arguments for constructing a persisted retry queue. +pub struct PersistedQueueArgs { + /// Root path under which the queue directory is created. + pub root_path: PathBuf, + /// Maximum total bytes the queue may occupy on disk. + pub max_on_disk_bytes: u64, + /// Maximum fraction of the disk that may be used before writes stop. + pub storage_max_disk_ratio: f64, + /// Provider for total- and available-disk-space queries. + pub disk_usage_retriever: Arc, + /// Maximum age of retry files in days; files older than this are removed on startup. + /// + /// Setting this to `0` removes all retry files (cutoff = now), matching the behavior of the + /// core Agent's `FileRemovalPolicy` with `outdatedFileDayCount = 0`. + pub max_age_days: u32, +} + pub struct PersistedQueue { root_path: PathBuf, entries: Vec, @@ -92,6 +109,7 @@ pub struct PersistedQueue { max_on_disk_bytes: u64, storage_max_disk_ratio: f64, disk_usage_retriever: DiskUsageRetrieverWrapper, + max_age_days: u32, entries_dropped: u64, _entry: PhantomData, } @@ -100,19 +118,26 @@ impl PersistedQueue where T: EventContainer + DeserializeOwned + Serialize, { - /// Creates a new `PersistedQueue` instance from the given root path and maximum size. + /// Creates a new `PersistedQueue` instance from the given arguments. /// /// The root path is created if it doesn't already exist, and is scanned for existing persisted entries. Entries /// are removed (oldest first) until the total size of all scanned entries is within the given maximum size. /// + /// To remove stale retry files on startup, call [`remove_stale_files`][Self::remove_stale_files] after construction. + /// /// # Errors /// /// If there is an error creating the root directory, or scanning it for existing entries, or deleting entries to /// shrink the directory to fit the given maximum size, an error is returned. - pub async fn from_root_path( - root_path: PathBuf, max_on_disk_bytes: u64, storage_max_disk_ratio: f64, - disk_usage_retriever: DiskUsageRetrieverWrapper, - ) -> Result { + pub async fn from_root_path(args: PersistedQueueArgs) -> Result { + let PersistedQueueArgs { + root_path, + max_on_disk_bytes, + storage_max_disk_ratio, + disk_usage_retriever, + max_age_days, + } = args; + // Make sure the directory exists first. create_directory_recursive(root_path.clone()) .await @@ -124,7 +149,8 @@ where total_on_disk_bytes: 0, max_on_disk_bytes, storage_max_disk_ratio, - disk_usage_retriever, + disk_usage_retriever: DiskUsageRetrieverWrapper::new(disk_usage_retriever), + max_age_days, entries_dropped: 0, _entry: PhantomData, }; @@ -155,6 +181,19 @@ where std::mem::take(&mut self.entries_dropped) } + /// Removes retry files older than `max_age_days` (from [`PersistedQueueArgs`]) from the queue directory and + /// reloads entry state. + /// + /// # Errors + /// + /// Returns an error if the queue directory cannot be opened or scanned. Individual file removal failures are + /// logged as warnings and do not stop the cleanup. + pub async fn remove_stale_files(&mut self) -> Result { + let removed = remove_outdated_retry_files(&self.root_path, self.max_age_days).await?; + self.refresh_entry_state().await.map_err(GenericError::from)?; + Ok(removed) + } + /// Enqueues an entry and persists it to disk. /// /// # Errors @@ -454,6 +493,62 @@ async fn create_directory_recursive(path: PathBuf) -> Result<(), GenericError> { .error_context("Failed to spawn directory creation blocking task.")? } +/// Deletes files in `queue_path` whose filename-embedded creation timestamp is older than +/// `max_age_days`. Does nothing if the directory does not exist. +/// +/// Setting `max_age_days` to `0` deletes all retry files (cutoff = now), matching the behavior +/// of the core Agent's `FileRemovalPolicy` with `outdatedFileDayCount = 0`. +async fn remove_outdated_retry_files(queue_path: &Path, max_age_days: u32) -> Result { + let mut dir = match tokio::fs::read_dir(queue_path).await { + Ok(d) => d, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0), + Err(e) => { + return Err(e).with_error_context(|| { + format!( + "Failed to open retry queue directory '{}' for age-based cleanup.", + queue_path.display() + ) + }); + } + }; + let now_ns = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() // clock before epoch: treat cutoff as 0, skipping all deletions + .as_nanos(); + let cutoff_ns = now_ns.saturating_sub(max_age_days as u128 * 24 * 3600 * 1_000_000_000); + let mut removed = 0u32; + loop { + let entry = match dir.next_entry().await { + Ok(Some(e)) => e, + Ok(None) => break, + Err(e) => { + return Err(e).with_error_context(|| "Error reading retry queue directory during age-based cleanup."); + } + }; + let file_ts = match decode_timestamped_filename(&entry.path()) { + Some(ts) => ts, + None => continue, + }; + if file_ts < cutoff_ns { + let name_str = entry.file_name(); + let name = name_str.to_string_lossy(); + match tokio::fs::remove_file(entry.path()).await { + Ok(()) => { + debug!(file = %name, "Removed outdated retry file."); + removed += 1; + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + debug!(file = %name, "Retry file already removed by concurrent cleanup."); + } + Err(e) => { + warn!(file = %name, error = %e, "Failed to remove outdated retry file."); + } + } + } + } + Ok(removed) +} + #[cfg(test)] mod tests { use rand::RngExt as _; @@ -513,12 +608,13 @@ mod tests { let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory"); let root_path = temp_dir.path().to_path_buf(); - let mut persisted_queue = PersistedQueue::::from_root_path( - root_path.clone(), - 1024, - 0.8, - DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))), - ) + let mut persisted_queue = PersistedQueue::::from_root_path(PersistedQueueArgs { + root_path: root_path.clone(), + max_on_disk_bytes: 1024, + storage_max_disk_ratio: 0.8, + disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())), + max_age_days: 10, + }) .await .expect("should not fail to create persisted queue"); @@ -552,12 +648,13 @@ mod tests { let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory"); let root_path = temp_dir.path().to_path_buf(); - let mut persisted_queue = PersistedQueue::::from_root_path( - root_path.clone(), - 1, - 0.8, - DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))), - ) + let mut persisted_queue = PersistedQueue::::from_root_path(PersistedQueueArgs { + root_path: root_path.clone(), + max_on_disk_bytes: 1, + storage_max_disk_ratio: 0.8, + disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())), + max_age_days: 10, + }) .await .expect("should not fail to create persisted queue"); @@ -582,12 +679,13 @@ mod tests { let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory"); let root_path = temp_dir.path().to_path_buf(); - let mut persisted_queue = PersistedQueue::::from_root_path( - root_path.clone(), - 32, - 0.8, - DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))), - ) + let mut persisted_queue = PersistedQueue::::from_root_path(PersistedQueueArgs { + root_path: root_path.clone(), + max_on_disk_bytes: 32, + storage_max_disk_ratio: 0.8, + disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())), + max_age_days: 10, + }) .await .expect("should not fail to create persisted queue"); @@ -631,12 +729,13 @@ mod tests { let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory"); let root_path = temp_dir.path().to_path_buf(); - let mut persisted_queue = PersistedQueue::::from_root_path( - root_path.clone(), - 80, - 0.35, - DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})), - ) + let mut persisted_queue = PersistedQueue::::from_root_path(PersistedQueueArgs { + root_path: root_path.clone(), + max_on_disk_bytes: 80, + storage_max_disk_ratio: 0.35, + disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}), + max_age_days: 10, + }) .await .expect("should not fail to create persisted queue"); @@ -689,12 +788,13 @@ mod tests { let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory"); let root_path = temp_dir.path().to_path_buf(); - let mut persisted_queue = PersistedQueue::::from_root_path( - root_path.clone(), - 1024, - 0.8, - DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})), - ) + let mut persisted_queue = PersistedQueue::::from_root_path(PersistedQueueArgs { + root_path: root_path.clone(), + max_on_disk_bytes: 1024, + storage_max_disk_ratio: 0.8, + disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}), + max_age_days: 10, + }) .await .expect("should not fail to create persisted queue"); @@ -737,12 +837,13 @@ mod tests { let root_path = temp_dir.path().to_path_buf(); // Use MockDiskUsageRetriever to avoid disk space ratio causing eviction during push. - let mut persisted_queue = PersistedQueue::::from_root_path( - root_path.clone(), - 1024, - 0.8, - DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})), - ) + let mut persisted_queue = PersistedQueue::::from_root_path(PersistedQueueArgs { + root_path: root_path.clone(), + max_on_disk_bytes: 1024, + storage_max_disk_ratio: 0.8, + disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}), + max_age_days: 10, + }) .await .expect("should not fail to create persisted queue"); @@ -775,12 +876,13 @@ mod tests { let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory"); let root_path = temp_dir.path().to_path_buf(); - let mut persisted_queue = PersistedQueue::::from_root_path( - root_path.clone(), - 1024, - 0.8, - DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})), - ) + let mut persisted_queue = PersistedQueue::::from_root_path(PersistedQueueArgs { + root_path: root_path.clone(), + max_on_disk_bytes: 1024, + storage_max_disk_ratio: 0.8, + disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}), + max_age_days: 10, + }) .await .expect("should not fail to create persisted queue"); @@ -804,12 +906,13 @@ mod tests { let root_path = temp_dir.path().to_path_buf(); // Queue sized to hold only one entry. - let mut persisted_queue = PersistedQueue::::from_root_path( - root_path.clone(), - 32, - 0.8, - DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})), - ) + let mut persisted_queue = PersistedQueue::::from_root_path(PersistedQueueArgs { + root_path: root_path.clone(), + max_on_disk_bytes: 32, + storage_max_disk_ratio: 0.8, + disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}), + max_age_days: 10, + }) .await .expect("should not fail to create persisted queue"); @@ -840,4 +943,83 @@ mod tests { assert_eq!(data, actual); assert_eq!(0, files_in_dir(&root_path).await); } + + #[tokio::test] + async fn persisted_queue_removes_outdated_files_on_initialization() { + let data = FakeData::random(); + + let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory"); + let root_path = temp_dir.path().to_path_buf(); + + // Pre-seed a year-2000 retry file containing valid data that would be loaded as an entry + // if it were not cleaned up first. + let stale_content = serde_json::to_vec(&data).unwrap(); + tokio::fs::write( + root_path.join("retry-20000101000000000000000-100000000.json"), + &stale_content, + ) + .await + .unwrap(); + + assert_eq!(1, files_in_dir(&root_path).await); + + // Initialize the queue and remove stale files with a 10-day age limit. + let mut queue = PersistedQueue::::from_root_path(PersistedQueueArgs { + root_path: root_path.clone(), + max_on_disk_bytes: 1024 * 1024, + storage_max_disk_ratio: 0.8, + disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())), + max_age_days: 10, + }) + .await + .expect("should not fail to create persisted queue"); + queue + .remove_stale_files() + .await + .expect("should not fail to remove stale files"); + + assert_eq!(0, files_in_dir(&root_path).await); + assert!(queue.is_empty()); + } + + #[tokio::test] + async fn persisted_queue_zero_age_removes_all_retry_files_on_initialization() { + // max_age_days=0 sets cutoff=now, matching the core Agent's FileRemovalPolicy behavior + // with outdatedFileDayCount=0 — all retry files are removed on startup. + let data = FakeData::random(); + + let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory"); + let root_path = temp_dir.path().to_path_buf(); + + // Seed with a freshly-written entry via a normal queue. + let mut seeding_queue = PersistedQueue::::from_root_path(PersistedQueueArgs { + root_path: root_path.clone(), + max_on_disk_bytes: 1024 * 1024, + storage_max_disk_ratio: 0.8, + disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())), + max_age_days: 10, + }) + .await + .expect("should not fail to create persisted queue"); + let _ = seeding_queue.push(data).await.expect("should not fail to push data"); + assert_eq!(1, files_in_dir(&root_path).await); + + // Re-open and remove stale files with max_age_days=0: the just-written file must also be deleted. + let mut queue = PersistedQueue::::from_root_path(PersistedQueueArgs { + root_path: root_path.clone(), + max_on_disk_bytes: 1024 * 1024, + storage_max_disk_ratio: 0.8, + disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())), + max_age_days: 0, + }) + .await + .expect("should not fail to create persisted queue"); + queue + .remove_stale_files() + .await + .expect("should not fail to remove stale files"); + + assert_eq!(0, files_in_dir(&root_path).await); + assert!(queue.is_empty()); + } }