Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/agent-data-plane/configuration/dogstatsd.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tracking.
| `dogstatsd_windows_pipe_security_descriptor` | Windows named pipe ACL descriptor | [#1466] |
| `forwarder_http_protocol` | HTTP version (auto/http1) | [#1361] |
| `forwarder_outdated_file_in_days` | Retry file retention (days) | [#1360] |
| `log_format_rfc3339` | Use RFC3339 timestamp format | [#1373] |
| `serializer_experimental_use_v3_api.*` | V3 metrics API migration flags | [#1468] |
| `sslkeylogfile` | TLS key log file path | [#1372] |
| `tls_handshake_timeout` | HTTP TLS handshake timeout | [#178] |
Expand Down
1 change: 1 addition & 0 deletions lib/saluki-components/src/common/datadog/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ async fn run_endpoint_io_loop<B>(
Arc::new(DiskUsageRetrieverImpl::new(PathBuf::from(
config.retry().storage_path(),
))),
config.retry().outdated_file_in_days(),
)
.await
.unwrap_or_else(|e| {
Expand Down
23 changes: 23 additions & 0 deletions lib/saluki-components/src/common/datadog/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ const fn default_storage_max_disk_ratio() -> f64 {
0.8
}

const fn default_outdated_file_in_days() -> u32 {
10
}

/// Datadog Agent-specific forwarder retry configuration.
#[derive(Clone, Deserialize, Facet)]
#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
Expand Down Expand Up @@ -130,6 +134,20 @@ pub struct RetryConfiguration {
rename = "forwarder_storage_max_disk_ratio"
)]
storage_max_disk_ratio: f64,

/// Maximum age in days for retry files on disk before they are deleted at startup.
///
/// When disk persistence is enabled, ADP removes any `retry-*.json` files in the
/// per-queue subdirectory of `forwarder_storage_path` that are older than this many days
/// each time it starts. This prevents unbounded disk growth from stale retry data left
/// behind after long outages.
///
/// Defaults to 10.
#[serde(
default = "default_outdated_file_in_days",
rename = "forwarder_outdated_file_in_days"
)]
outdated_file_in_days: u32,
}

impl RetryConfiguration {
Expand Down Expand Up @@ -180,6 +198,11 @@ impl RetryConfiguration {
self.storage_max_disk_ratio
}

/// Returns the maximum age in days for retry files on disk before they are deleted at startup.
pub const fn outdated_file_in_days(&self) -> u32 {
self.outdated_file_in_days
}

/// Creates a new [`DefaultHttpRetryPolicy`] based on the forwarder configuration.
///
/// If a [`GenericConfiguration`] is supplied, the policy captures it and checks whether
Expand Down
12 changes: 12 additions & 0 deletions lib/saluki-components/src/config_registry/datadog/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,4 +321,16 @@ crate::declare_annotations! {
test_json: None,
pipeline_affinity: PipelineAffinity::CrossCutting,
};

/// `forwarder_outdated_file_in_days`—maximum age in days for retry files before deletion at startup.
FORWARDER_OUTDATED_FILE_IN_DAYS = SalukiAnnotation {
schema: &schema::FORWARDER_OUTDATED_FILE_IN_DAYS,
support_level: SupportLevel::Full,
additional_yaml_paths: &[],
env_var_override: None,
used_by: &[structs::FORWARDER_CONFIGURATION],
value_type_override: Some(ValueType::Integer),
test_json: None,
pipeline_affinity: PipelineAffinity::CrossCutting,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,6 @@ crate::declare_annotations! {
pipeline_affinity: PipelineAffinity::CrossCutting,
};

/// `forwarder_outdated_file_in_days` - retry file retention period.
FORWARDER_OUTDATED_FILE_IN_DAYS = SalukiAnnotation {
schema: &schema::FORWARDER_OUTDATED_FILE_IN_DAYS,
// Retry file retention not implemented. #1360
support_level: SupportLevel::Incompatible(Severity::Medium),
additional_yaml_paths: &[],
env_var_override: None,
used_by: &[],
value_type_override: None,
test_json: None,
// The forwarder is potentially used by any pipeline.
pipeline_affinity: PipelineAffinity::CrossCutting,
};

/// `forwarder_retry_queue_capacity_time_interval_sec` - retry queue time-based capacity.
FORWARDER_RETRY_QUEUE_CAPACITY_TIME_INTERVAL_SEC = SalukiAnnotation {
schema: &schema::FORWARDER_RETRY_QUEUE_CAPACITY_TIME_INTERVAL_SEC,
Expand Down
4 changes: 3 additions & 1 deletion lib/saluki-io/src/net/util/retry/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ where
/// If there is an error initializing the disk persistence layer, an error is returned.
pub async fn with_disk_persistence(
mut self, root_path: PathBuf, max_disk_size_bytes: u64, storage_max_disk_ratio: f64,
disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>,
disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>, max_age_days: u32,
) -> Result<Self, GenericError> {
// Make sure the root storage path is non-empty, as otherwise we can't generate a valid path
// for the persisted entries in this retry queue.
Expand All @@ -138,6 +138,7 @@ where
max_disk_size_bytes,
storage_max_disk_ratio,
DiskUsageRetrieverWrapper::new(disk_usage_retriever),
max_age_days,
)
.await?;
self.persisted_pending = Some(persisted_pending);
Expand Down Expand Up @@ -434,6 +435,7 @@ mod tests {
u64::MAX,
1.0,
Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
10,
)
.await
.expect("should not fail to create retry queue with disk persistence");
Expand Down
139 changes: 138 additions & 1 deletion lib/saluki-io/src/net/util/retry/queue/persisted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,18 @@ where
/// shrink the directory to fit the given maximum size, an error is returned.
pub async fn from_root_path(
root_path: PathBuf, max_on_disk_bytes: u64, storage_max_disk_ratio: f64,
disk_usage_retriever: DiskUsageRetrieverWrapper,
disk_usage_retriever: DiskUsageRetrieverWrapper, max_age_days: u32,
) -> Result<Self, GenericError> {
// Make sure the directory exists first.
create_directory_recursive(root_path.clone())
.await
.with_error_context(|| format!("Failed to create retry directory '{}'.", root_path.display()))?;

// Remove stale retry files before loading state. This must run after directory creation
// but before refresh_entry_state so the queue doesn't load files that are about to be
// removed.
remove_outdated_retry_files(&root_path, max_age_days).await;

let mut persisted_requests = Self {
root_path: root_path.clone(),
entries: Vec::new(),
Expand Down Expand Up @@ -454,6 +459,58 @@ async fn create_directory_recursive(path: PathBuf) -> Result<(), GenericError> {
.error_context("Failed to spawn directory creation blocking task.")?
}

/// Deletes files in `queue_path` whose filename-embedded creation timestamp is older than
/// `max_age_days`. Does nothing if the directory does not exist.
async fn remove_outdated_retry_files(queue_path: &Path, max_age_days: u32) {
let mut dir = match tokio::fs::read_dir(queue_path).await {
Ok(d) => d,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return,
Err(e) => {
warn!(path = %queue_path.display(), error = %e, "Failed to open retry queue directory for age-based cleanup.");
return;
}
};
let now_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default() // clock before epoch: treat cutoff as 0, skipping all deletions
.as_nanos();
let cutoff_ns = now_ns.saturating_sub(max_age_days as u128 * 24 * 3600 * 1_000_000_000);
let mut removed = 0u32;
loop {
let entry = match dir.next_entry().await {
Ok(Some(e)) => e,
Ok(None) => break,
Err(e) => {
warn!(error = %e, "Error reading retry queue directory during age-based cleanup.");
break;
}
};
let file_ts = match decode_timestamped_filename(&entry.path()) {
Some(ts) => ts,
None => continue,
};
if file_ts < cutoff_ns {
let name_str = entry.file_name();
let name = name_str.to_string_lossy();
match tokio::fs::remove_file(entry.path()).await {
Ok(()) => {
debug!(file = %name, "Removed outdated retry file.");
removed += 1;
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
debug!(file = %name, "Retry file already removed by concurrent cleanup.");
}
Err(e) => {
warn!(file = %name, error = %e, "Failed to remove outdated retry file.");
}
}
}
}
if removed > 0 {
info!(count = removed, max_age_days, "Removed outdated retry files from disk.");
}
}

#[cfg(test)]
mod tests {
use rand::RngExt as _;
Expand Down Expand Up @@ -518,6 +575,7 @@ mod tests {
1024,
0.8,
DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
10,
)
.await
.expect("should not fail to create persisted queue");
Expand Down Expand Up @@ -557,6 +615,7 @@ mod tests {
1,
0.8,
DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
10,
)
.await
.expect("should not fail to create persisted queue");
Expand Down Expand Up @@ -587,6 +646,7 @@ mod tests {
32,
0.8,
DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
10,
)
.await
.expect("should not fail to create persisted queue");
Expand Down Expand Up @@ -636,6 +696,7 @@ mod tests {
80,
0.35,
DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
10,
)
.await
.expect("should not fail to create persisted queue");
Expand Down Expand Up @@ -694,6 +755,7 @@ mod tests {
1024,
0.8,
DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
10,
)
.await
.expect("should not fail to create persisted queue");
Expand Down Expand Up @@ -742,6 +804,7 @@ mod tests {
1024,
0.8,
DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
10,
)
.await
.expect("should not fail to create persisted queue");
Expand Down Expand Up @@ -780,6 +843,7 @@ mod tests {
1024,
0.8,
DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
10,
)
.await
.expect("should not fail to create persisted queue");
Expand Down Expand Up @@ -809,6 +873,7 @@ mod tests {
32,
0.8,
DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
10,
)
.await
.expect("should not fail to create persisted queue");
Expand Down Expand Up @@ -840,4 +905,76 @@ mod tests {
assert_eq!(data, actual);
assert_eq!(0, files_in_dir(&root_path).await);
}

#[tokio::test]
async fn persisted_queue_removes_outdated_files_on_initialization() {
let data = FakeData::random();

let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
let root_path = temp_dir.path().to_path_buf();

// Pre-seed a year-2000 retry file containing valid data that would be loaded as an entry
// if it were not cleaned up first.
let stale_content = serde_json::to_vec(&data).unwrap();
tokio::fs::write(
root_path.join("retry-20000101000000000000000-100000000.json"),
&stale_content,
)
.await
.unwrap();

assert_eq!(1, files_in_dir(&root_path).await);

// Initialize the queue with a 10-day age limit.
// The stale file must be deleted before entries are loaded.
let queue = PersistedQueue::<FakeData>::from_root_path(
root_path.clone(),
1024 * 1024,
0.8,
DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
10,
)
.await
.expect("should not fail to create persisted queue");

assert_eq!(0, files_in_dir(&root_path).await);
assert!(queue.is_empty());
}

#[tokio::test]
async fn persisted_queue_zero_age_removes_all_retry_files_on_initialization() {
// max_age_days=0 sets cutoff=now, matching the core Agent's FileRemovalPolicy behavior
// with outdatedFileDayCount=0 — all retry files are removed on startup.
let data = FakeData::random();

let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
let root_path = temp_dir.path().to_path_buf();

// Seed with a freshly-written entry via a normal queue.
let mut seeding_queue = PersistedQueue::<FakeData>::from_root_path(
root_path.clone(),
1024 * 1024,
0.8,
DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
10,
)
.await
.expect("should not fail to create persisted queue");
let _ = seeding_queue.push(data).await.expect("should not fail to push data");
assert_eq!(1, files_in_dir(&root_path).await);

// Re-open with max_age_days=0: the just-written file must also be deleted.
let queue = PersistedQueue::<FakeData>::from_root_path(
root_path.clone(),
1024 * 1024,
0.8,
DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
0,
)
.await
.expect("should not fail to create persisted queue");

assert_eq!(0, files_in_dir(&root_path).await);
assert!(queue.is_empty());
}
}
Loading