feat(io): Retry transactions if secrets are enabled#1595
Conversation
…ssifier Introduce `StatusCodeRetryPredicate` (`Arc<dyn Fn() -> bool + Send + Sync>`) as a per-status-code retriability override for `StandardHttpClassifier`. The predicate is re-evaluated on every classification call, so callers can back it with live (runtime-mutable) state without rebuilding the policy. - Convert `StandardHttpClassifier` from a unit struct to one holding a `HashMap<StatusCode, StatusCodeRetryPredicate>`. - Add `with_status_code_predicate`, `set_status_code_predicate`, and `remove_status_code_predicate` to manage overrides. - Add `DefaultHttpRetryPolicy::with_backoff_and_classifier` and delegate the existing `with_backoff` constructor to it. - Re-export `StatusCodeRetryPredicate` from the crate root. - Cover all new behaviour with unit tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Mark Kirichenko <mark.kirichenko@datadoghq.com>
When the Core Agent refreshes an API key via secrets management, in-flight requests can transiently receive a 403 Forbidden before the key propagates. Previously the forwarder treated every 403 as a permanent client error and dropped the request. This commit wires a live-config-backed `StatusCodeRetryPredicate` for 403 into the forwarder's retry policy: - Add `is_secrets_in_use(config: &GenericConfiguration) -> bool`, which checks four runtime config keys (`secret_refresh_on_api_key_failure_interval`, `secret_refresh_interval`, `secret_backend_type`, `secret_backend_command`) to decide whether secrets management is active. - Update `RetryConfiguration::to_default_http_retry_policy` to accept an `Option<GenericConfiguration>`. When present, a predicate backed by `is_secrets_in_use` is registered for 403, so the decision is re-evaluated against live config on every response. - Thread `live_config: Option<GenericConfiguration>` from `TransactionForwarder` through `run_io_loop` and `run_endpoint_io_loop` down to the `RetryCircuitBreakerLayer` call site. - Add unit tests for `is_secrets_in_use`, the policy in isolation, and a dynamic-config round-trip test that proves the predicate reflects runtime updates to the same policy instance. - Add end-to-end integration tests in `io.rs` using a real TCP server to verify that 403 is not retried without secrets and is retried with secrets. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Mark Kirichenko <mark.kirichenko@datadoghq.com>
Regression Detector (Agent Data Plane)Regression Detector ResultsRun ID: 7b1cf41e-acd0-4d24-8a74-a4a32e128e6a Baseline: 7c22c4d Optimization Goals: ✅ No significant changes detected
|
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | otlp_ingest_logs_5mb_throughput | ingress throughput | +0.02 | [-0.10, +0.13] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_cpu | % cpu utilization | -0.17 | [-4.86, +4.52] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_memory | memory utilization | -4.51 | [-4.93, -4.08] | 1 | (metrics) (profiles) (logs) |
Fine details of change detection per experiment
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | otlp_ingest_metrics_5mb_memory | memory utilization | +2.90 | [+2.68, +3.11] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_cpu | % cpu utilization | +1.67 | [-3.85, +7.20] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_medium | memory utilization | +0.38 | [+0.21, +0.55] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_idle | memory utilization | +0.37 | [+0.34, +0.41] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_transform_5mb_memory | memory utilization | +0.20 | [+0.04, +0.36] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_filtering_5mb_throughput | ingress throughput | +0.12 | [+0.04, +0.19] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_transform_5mb_throughput | ingress throughput | +0.10 | [+0.03, +0.18] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_5mb_throughput | ingress throughput | +0.10 | [+0.02, +0.17] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_memory | memory utilization | +0.09 | [-0.07, +0.24] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_cpu | % cpu utilization | +0.06 | [-1.36, +1.49] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_throughput | ingress throughput | +0.02 | [-0.02, +0.06] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_throughput | ingress throughput | +0.02 | [-0.10, +0.13] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_throughput | ingress throughput | +0.01 | [-0.19, +0.20] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_memory | memory utilization | +0.00 | [-0.14, +0.14] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_throughput | ingress throughput | +0.00 | [-0.05, +0.06] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_throughput | ingress throughput | +0.00 | [-0.19, +0.19] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_throughput | ingress throughput | -0.00 | [-0.05, +0.05] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_low | memory utilization | -0.06 | [-0.22, +0.10] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_heavy | memory utilization | -0.11 | [-0.23, +0.01] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_memory | memory utilization | -0.15 | [-0.30, -0.00] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_transform_5mb_cpu | % cpu utilization | -0.15 | [-2.20, +1.89] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_cpu | % cpu utilization | -0.17 | [-4.86, +4.52] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_filtering_5mb_memory | memory utilization | -0.18 | [-0.42, +0.06] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_ultraheavy | memory utilization | -0.23 | [-0.36, -0.10] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_memory | memory utilization | -0.27 | [-0.42, -0.12] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_throughput | ingress throughput | -0.31 | [-0.46, -0.16] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_memory | memory utilization | -0.34 | [-0.47, -0.20] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_5mb_memory | memory utilization | -0.36 | [-0.52, -0.21] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_5mb_cpu | % cpu utilization | -0.53 | [-2.66, +1.60] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_ottl_filtering_5mb_cpu | % cpu utilization | -1.45 | [-3.73, +0.83] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_cpu | % cpu utilization | -1.55 | [-31.13, +28.03] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_cpu | % cpu utilization | -2.79 | [-8.91, +3.32] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_cpu | % cpu utilization | -3.03 | [-55.65, +49.60] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_memory | memory utilization | -4.51 | [-4.93, -4.08] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_cpu | % cpu utilization | -10.84 | [-64.39, +42.71] | 1 | (metrics) (profiles) (logs) |
Bounds Checks: ✅ Passed
| perf | experiment | bounds_check_name | replicates_passed | observed_value | links |
|---|---|---|---|---|---|
| ✅ | quality_gates_rss_dsd_heavy | memory_usage | 10/10 | 123.74MiB ≤ 140MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_low | memory_usage | 10/10 | 40.26MiB ≤ 50MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_medium | memory_usage | 10/10 | 60.44MiB ≤ 75MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_ultraheavy | memory_usage | 10/10 | 177.11MiB ≤ 200MiB | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_idle | memory_usage | 10/10 | 27.82MiB ≤ 40MiB | (metrics) (profiles) (logs) |
Explanation
Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%
Performance changes are noted in the perf column of each table:
- ✅ = significantly better comparison variant performance
- ❌ = significantly worse comparison variant performance
- ➖ = no significant change in performance
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 001ef79959
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| .concurrency_limit(config.endpoint_concurrency()) | ||
| .layer(RetryCircuitBreakerLayer::new( | ||
| config.retry().to_default_http_retry_policy(), | ||
| config.retry().to_default_http_retry_policy(live_config.clone()), |
There was a problem hiding this comment.
Limit 403 retries to endpoints with refreshable keys
When additional_endpoints are configured and secrets are enabled, this installs the 403 retry override for every endpoint I/O loop. However, EndpointConfiguration::build_resolved_endpoints only attaches the live GenericConfiguration to the primary endpoint; additional endpoints are built as static ResolvedEndpoints, so their API keys cannot be refreshed by ResolvedEndpoint::api_key(). A permanent 403 from a secondary endpoint will therefore be re-enqueued and retried until queue limits are hit instead of being treated as a non-retriable auth error, consuming retry queue/disk capacity and potentially delaying or dropping other transactions.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Correct, I will follow-up on that as a part of #1540
webern
left a comment
There was a problem hiding this comment.
There might be a way to simplify if my comments are correct...
| match status { | ||
| // There's some status codes that likely indicate a fundamental misconfiguration or bug on the | ||
| // client side which won't be resolved by retrying the request. | ||
| StatusCode::BAD_REQUEST | ||
| | StatusCode::UNAUTHORIZED | ||
| | StatusCode::FORBIDDEN | ||
| | StatusCode::PAYLOAD_TOO_LARGE => false, | ||
|
|
||
| // For all other status codes, we'll only retry if they're in the client/server error range. | ||
| status => status.is_client_error() || status.is_server_error(), |
There was a problem hiding this comment.
Just a thought. The predicate idea could be more open to extensibility if it is just a functor that takes the response and returns a bool.
pub type HttpRetryPredicate<B> = Arc<dyn Fn(&http::Response<B>) -> bool + Send + Sync>;
fn default_should_retry<B>(resp: &http::Response<B>) -> bool {
match resp.status() {
StatusCode::BAD_REQUEST
| StatusCode::UNAUTHORIZED
| StatusCode::FORBIDDEN
| StatusCode::PAYLOAD_TOO_LARGE => false,
_ => resp.status().is_client_error() || resp.status().is_server_error(),
}
}
pub struct StandardHttpClassifier<B> {
predicates: Vec<HttpRetryPredicate<B>>,
}
impl<B: 'static> StandardHttpClassifier<B> {
/// Starts with the default status-code predicate installed.
pub fn new() -> Self {
Self { predicates: vec![Arc::new(default_should_retry)] }
}
/// Starts with no predicates; all responses are non-retriable until one is added.
pub fn empty() -> Self {
Self { predicates: Vec::new() }
}
/// Adds a predicate. OR semantics: retry if any predicate returns `true`.
pub fn with_predicate(mut self, predicate: HttpRetryPredicate<B>) -> Self {
self.predicates.push(predicate);
self
}
}
impl<B, Error> RetryClassifier<http::Response<B>, Error> for StandardHttpClassifier<B> {
fn should_retry(&self, response: &Result<http::Response<B>, Error>) -> bool {
match response {
Ok(resp) => self.predicates.iter().any(|p| p(resp)),
Err(_) => true,
}
}
}The secrets predicate in retry.rs becomes:
let secrets_gate: HttpRetryPredicate<_> = Arc::new(move |resp| {
resp.status() == StatusCode::FORBIDDEN && is_secrets_in_use(&config)
});
let classifier = StandardHttpClassifier::new().with_predicate(secrets_gate);This removes the HashMap<StatusCode, predicate> keying and the set_/remove_status_code_predicate
surface - predicates own their own logic and have the full response available.
| @@ -186,6 +188,7 @@ where | |||
| let Self { | |||
There was a problem hiding this comment.
Not a request to fix in this PR, but this destructure is where the too_many_arguments warnings (suppressed a few lines down) seem to originate. All these fields are owned by self, destructured here, and then passed individually through run_io_loop and run_endpoint_io_loop.
Perhaps self could be left in-tact here and then passed through as a single argument. Not sure.
| let Self { | |
| // TODO: do not destructure self as a way to fix the #[allow(clippy::too_many_arguments)] annotations | |
| let Self { |
| /// - `secret_refresh_interval` is greater than zero | ||
| /// - `secret_backend_type` is set to a non-empty string | ||
| /// - `secret_backend_command` is set to a non-empty string | ||
| pub fn is_secrets_in_use(config: &GenericConfiguration) -> bool { |
There was a problem hiding this comment.
The four keys this reads (secret_backend_command, secret_backend_type, secret_refresh_interval, secret_refresh_on_api_key_failure_interval) describe whether secrets management is configured, not whether a refresh is currently in progress. I believe these are set at startup and not expected to change at runtime (right?), so the dynamic evaluation on every 403 may not be necessary and they could be read just once.
If that's right, a bool computed at construction time in from_config would behave identically:
let retry_403_on_secrets = live_config.as_ref().map_or(false, is_secrets_in_use);That bool could then be captured directly in the predicate closure, and Option<GenericConfiguration> wouldn't need to be threaded through TransactionForwarder, run_io_loop, and run_endpoint_io_loop.
| /// Creates a new `TransactionForwarder` instance from the given configuration. | ||
| pub fn from_config<F>( | ||
| context: ComponentContext, config: ForwarderConfiguration, configuration: Option<GenericConfiguration>, | ||
| context: ComponentContext, config: ForwarderConfiguration, live_config: Option<GenericConfiguration>, |
There was a problem hiding this comment.
If the is_secrets_in_use question (retry.rs:219) resolves to a bool, this parameter goes away and the threading through run_io_loop / run_endpoint_io_loop goes with it.
Summary
Retry transient 403 Forbidden responses in the Datadog forwarder when secrets management is active, so requests in flight during an API key refresh aren't dropped.
Change Type
How did you test this PR?
StandardHttpClassifierpredicate management (with_/set_/remove_status_code_predicate) andDefaultHttpRetryPolicy::with_backoff_and_classifier.is_secrets_in_useacross the foursecret_*config keys.GenericConfigurationand confirms the same retry-policy instance flips its 403 decision.lib/saluki-components/src/common/datadog/io.rsusing a test TCP server, asserting 403 is not retried without secrets and is retried when secrets are in use.References