diff --git a/bin/agent-data-plane/src/cli/run.rs b/bin/agent-data-plane/src/cli/run.rs index fe06ab0d94b..930117c2562 100644 --- a/bin/agent-data-plane/src/cli/run.rs +++ b/bin/agent-data-plane/src/cli/run.rs @@ -167,12 +167,14 @@ pub async fn handle_run_command( let dsd_stats_config = DogStatsDStatisticsConfiguration::new(); // Create the blueprint for our primary topology. + let (blueprint, control_surfaces) = create_topology( &config, &dp_config, &env_provider, &component_registry, dsd_stats_config.clone(), + &ra_bootstrap, ) .await?; let (dsd_capture_api_handler, dsd_replay_api_handler) = match control_surfaces.dogstatsd { @@ -441,6 +443,7 @@ struct DogStatsDControlSurface { async fn create_topology( config: &GenericConfiguration, dp_config: &DataPlaneConfiguration, env_provider: &ADPEnvironmentProvider, component_registry: &ComponentRegistry, dsd_stats_config: DogStatsDStatisticsConfiguration, + ra_bootstrap: &Option, ) -> Result<(TopologyBlueprint, TopologyControlSurfaces), GenericError> { let mut blueprint = TopologyBlueprint::new("primary", component_registry); let mut control_surfaces = TopologyControlSurfaces::default(); @@ -465,8 +468,13 @@ async fn create_topology( || dp_config.service_checks_pipeline_required() || dp_config.traces_pipeline_required() { - let dd_forwarder_config = + let mut dd_forwarder_config = DatadogConfiguration::from_configuration(config).error_context("Failed to configure Datadog forwarder.")?; + + if let Some(ra_bootstrap) = ra_bootstrap { + dd_forwarder_config = dd_forwarder_config + .with_config_update_retry_predicate(ra_bootstrap.create_config_update_retry_predicate()); + } blueprint.add_forwarder("dd_out", dd_forwarder_config)?; } diff --git a/bin/agent-data-plane/src/internal/remote_agent.rs b/bin/agent-data-plane/src/internal/remote_agent.rs index 4e4da96638b..3d25c9b5dfb 100644 --- a/bin/agent-data-plane/src/internal/remote_agent.rs +++ b/bin/agent-data-plane/src/internal/remote_agent.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::{collections::hash_map::Entry, time::Duration}; +use std::{collections::hash_map::Entry, sync::Arc, time::Duration}; use async_trait::async_trait; use chrono::{DateTime, Utc}; @@ -20,6 +20,7 @@ use saluki_core::observability::metrics::{ get_shared_metrics_state, AggregatedMetricsProcessor, Reflector, TelemetryProcessor, }; use saluki_error::{generic_error, GenericError}; +use saluki_io::net::util::retry::HttpRetryPredicate; use saluki_io::net::GrpcTargetAddress; use serde_json::{Map, Value}; use tokio::{ @@ -146,6 +147,31 @@ impl RemoteAgentBootstrap { receiver } + + /// Creates a predicate that requests Agent configuration updates when a 403 response is retried. + pub fn create_config_update_retry_predicate(&self) -> HttpRetryPredicate { + let client = self.client.clone(); + let session_id = self.session_id.clone(); + + Arc::new(move |_| { + let Some(current_session_id) = session_id.get() else { + debug!("Cannot request Datadog Agent config updates because no remote agent session ID is available."); + return true; + }; + + let client = client.clone(); + spawn_traced_named("adp-request-config-updates", async move { + match client.request_config_updates(¤t_session_id).await { + Ok(_) => debug!(session_id = %current_session_id, "Requested Datadog Agent config updates."), + Err(e) => { + warn!(session_id = %current_session_id, error = %e, "Failed to request Datadog Agent config updates.") + } + } + }); + + true + }) + } } struct RemoteAgentState { diff --git a/lib/datadog-agent-commons/src/ipc/client/mod.rs b/lib/datadog-agent-commons/src/ipc/client/mod.rs index a97bc75fa43..c141cfcc145 100644 --- a/lib/datadog-agent-commons/src/ipc/client/mod.rs +++ b/lib/datadog-agent-commons/src/ipc/client/mod.rs @@ -16,9 +16,9 @@ use saluki_io::net::client::http::HttpsCapableConnectorBuilder; use tonic::{ service::interceptor::InterceptedService, transport::{Channel, Endpoint}, - Code, Request, Response, + Code, Request, Response, Status, }; -use tracing::warn; +use tracing::{debug, warn}; use crate::ipc::{config::RemoteAgentClientConfiguration, session::SessionId, tls::build_ipc_client_ipc_tls_config}; @@ -222,6 +222,38 @@ impl RemoteAgentClient { StreamingResponse::from_response_future(async move { client.stream_config_events(request).await }) } + + /// Requests that the Agent refresh its configuration and publish updates. + /// + /// A successful RPC response only means the request was delivered to the Agent. Configuration recovery still depends + /// on the config stream publishing updated values. + /// + /// # Errors + /// + /// If there is an error sending the request to the Agent API, an error will be returned. Older Agents that do not + /// implement the RPC are treated as successful no-ops. + pub async fn request_config_updates(&self, session_id: &SessionId) -> Result, GenericError> { + let mut client = self.secure_client.clone(); + let mut request = Request::new(()); + + request + .metadata_mut() + .insert("session_id", session_id.to_grpc_header_value()); + + match client.request_config_updates(request).await { + Ok(response) => Ok(response.map(|_| ())), + Err(status) => handle_request_config_updates_error(status).map_err(Into::into), + } + } +} + +fn handle_request_config_updates_error(status: Status) -> Result, Status> { + if status.code() == Code::Unimplemented { + debug!("Datadog Agent does not implement RequestConfigUpdates. Continuing without config refresh requests."); + Ok(Response::new(())) + } else { + Err(status) + } } async fn try_query_agent_api( @@ -244,3 +276,26 @@ async fn try_query_agent_api( }, } } + +#[cfg(test)] +mod tests { + use tonic::Code; + + use super::*; + + #[test] + fn request_config_updates_treats_unimplemented_as_noop() { + let response = handle_request_config_updates_error(Status::new(Code::Unimplemented, "unknown method")) + .expect("unimplemented should be treated as a no-op"); + + assert_eq!(response.into_inner(), ()); + } + + #[test] + fn request_config_updates_preserves_other_errors() { + let status = handle_request_config_updates_error(Status::new(Code::Unavailable, "agent unavailable")) + .expect_err("non-unimplemented errors should be preserved"); + + assert_eq!(status.code(), Code::Unavailable); + } +} diff --git a/lib/protos/datadog/proto/datadog-agent/datadog/api/v1/api.proto b/lib/protos/datadog/proto/datadog-agent/datadog/api/v1/api.proto index bf94e833563..f9ee96748eb 100644 --- a/lib/protos/datadog/proto/datadog-agent/datadog/api/v1/api.proto +++ b/lib/protos/datadog/proto/datadog-agent/datadog/api/v1/api.proto @@ -65,4 +65,7 @@ service AgentSecure { // Streams config events to the remote agent. rpc StreamConfigEvents(datadog.model.v1.ConfigStreamRequest) returns (stream datadog.model.v1.ConfigEvent); + + // Requests that the Agent refresh its configuration and publish updates. + rpc RequestConfigUpdates(google.protobuf.Empty) returns (google.protobuf.Empty); } diff --git a/lib/saluki-components/src/common/datadog/config.rs b/lib/saluki-components/src/common/datadog/config.rs index ac9a94a2802..9becbcd5b64 100644 --- a/lib/saluki-components/src/common/datadog/config.rs +++ b/lib/saluki-components/src/common/datadog/config.rs @@ -10,7 +10,7 @@ use tracing::warn; use super::{ endpoints::{EndpointConfiguration, EndpointRoute, RoutableEndpoint}, proxy::ProxyConfiguration, - retry::RetryConfiguration, + retry::{retryable_forbidden_predicate_for_config, RetryConfiguration}, }; const fn default_endpoint_concurrency() -> usize { @@ -331,6 +331,14 @@ impl ForwarderConfiguration { &self.retry } + /// Configures the retry predicate used to request Core Agent configuration updates for retryable 403 responses. + pub(crate) fn with_config_update_retry_predicate( + &mut self, config: GenericConfiguration, predicate: saluki_io::net::util::retry::HttpRetryPredicate, + ) { + self.retry + .with_retry_predicate(retryable_forbidden_predicate_for_config(config, predicate)); + } + /// Returns a reference to the proxy configuration. pub const fn proxy(&self) -> &Option { &self.proxy diff --git a/lib/saluki-components/src/common/datadog/io.rs b/lib/saluki-components/src/common/datadog/io.rs index 61eaf11b195..0cc5426aaf3 100644 --- a/lib/saluki-components/src/common/datadog/io.rs +++ b/lib/saluki-components/src/common/datadog/io.rs @@ -94,7 +94,6 @@ where pub struct TransactionForwarder { context: ComponentContext, config: ForwarderConfiguration, - live_config: Option, telemetry: ComponentTelemetry, metrics_builder: MetricsBuilder, client: HttpClient, @@ -180,7 +179,6 @@ where Ok(Self { context, config, - live_config, telemetry, metrics_builder, client, @@ -201,7 +199,6 @@ where let Self { context, config, - live_config, telemetry, metrics_builder, client, @@ -216,7 +213,6 @@ where io_shutdown_tx, context, config, - live_config, client, telemetry, metrics_builder, @@ -234,9 +230,8 @@ where #[allow(clippy::too_many_arguments)] async fn run_io_loop( mut transactions_rx: mpsc::Receiver>, io_shutdown_tx: oneshot::Sender<()>, - context: ComponentContext, config: ForwarderConfiguration, live_config: Option, - service: HttpClient, telemetry: ComponentTelemetry, metrics_builder: MetricsBuilder, - resolved_endpoints: Vec, + context: ComponentContext, config: ForwarderConfiguration, service: HttpClient, telemetry: ComponentTelemetry, + metrics_builder: MetricsBuilder, resolved_endpoints: Vec, ) where B: Body + Buf + Clone + Send + Sync + 'static, B::Data: Send, @@ -269,7 +264,6 @@ async fn run_io_loop( task_barrier, context.clone(), config.clone(), - live_config.clone(), service.clone(), telemetry.clone(), txnq_telemetry, @@ -350,8 +344,8 @@ fn should_route_to_endpoint(is_metrics_request: bool, has_metrics_primary: bool, #[allow(clippy::too_many_arguments)] async fn run_endpoint_io_loop( mut txns_rx: mpsc::Receiver>, task_barrier: Arc, context: ComponentContext, - config: ForwarderConfiguration, live_config: Option, service: HttpClient, - telemetry: ComponentTelemetry, txnq_telemetry: TransactionQueueTelemetry, endpoint: ResolvedEndpoint, + config: ForwarderConfiguration, service: HttpClient, telemetry: ComponentTelemetry, + txnq_telemetry: TransactionQueueTelemetry, endpoint: ResolvedEndpoint, ) where B: Body + Buf + Clone + Send + Sync + 'static, B::Data: Send, @@ -383,7 +377,7 @@ async fn run_endpoint_io_loop( .map_request(with_version_info()) .concurrency_limit(config.endpoint_concurrency()) .layer(RetryCircuitBreakerLayer::new( - config.retry().to_default_http_retry_policy(live_config), + config.retry().to_default_http_retry_policy(), )) .map_request(|req: Request>| req.map(into_client_body)) .service(service); @@ -830,9 +824,10 @@ mod tests { RootCertStore, ServerConfig, }; use saluki_common::buf::FrozenChunkedBytesBuffer; - use saluki_config::ConfigurationLoader; + use saluki_config::{dynamic::ConfigUpdate, ConfigurationLoader}; use saluki_core::{observability::ComponentMetricsExt as _, topology::ComponentId}; use saluki_io::net::client::http::TlsMinimumVersion; + use saluki_io::net::util::retry::HttpRetryPredicate; use serde_json::json; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, @@ -1273,8 +1268,81 @@ app.datadoghq.com: [key-a, key-b] None } + async fn start_api_key_recording_http_server(success_api_key: &'static str) -> (String, mpsc::Receiver) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let (api_key_tx, api_key_rx) = mpsc::channel(16); + + tokio::spawn(async move { + loop { + let (mut stream, _) = match listener.accept().await { + Ok(pair) => pair, + Err(_) => return, + }; + let api_key_tx = api_key_tx.clone(); + + tokio::spawn(async move { + let mut request = Vec::new(); + let mut buf = [0u8; 1024]; + loop { + match stream.read(&mut buf).await { + Ok(0) => return, + Ok(n) => { + request.extend_from_slice(&buf[..n]); + if request.windows(4).any(|window| window == b"\r\n\r\n") { + break; + } + } + Err(_) => return, + } + } + + let request_str = String::from_utf8_lossy(&request).into_owned(); + let content_length = parse_content_length(&request_str).unwrap_or(0); + let header_end = request + .windows(4) + .position(|w| w == b"\r\n\r\n") + .map_or(request.len(), |idx| idx + 4); + let mut already_read_body = request.len().saturating_sub(header_end); + while already_read_body < content_length { + match stream.read(&mut buf).await { + Ok(0) => break, + Ok(n) => already_read_body += n, + Err(_) => return, + } + } + + let api_key = parse_header_value(&request_str, "dd-api-key").unwrap_or_default(); + let status = if api_key == success_api_key { + StatusCode::OK + } else { + StatusCode::FORBIDDEN + }; + let _ = api_key_tx.send(api_key).await; + + let response = format!( + "HTTP/1.1 {} {}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n", + status.as_u16(), + status.canonical_reason().unwrap_or(""), + ); + let _ = stream.write_all(response.as_bytes()).await; + let _ = stream.shutdown().await; + }); + } + }); + + (format!("http://127.0.0.1:{port}/"), api_key_rx) + } + + fn parse_header_value(request: &str, header_name: &str) -> Option { + request.lines().find_map(|line| { + let (name, value) = line.split_once(':')?; + name.eq_ignore_ascii_case(header_name).then(|| value.trim().to_string()) + }) + } + fn build_test_forwarder( - forwarder_url: &str, live_config: Option, + forwarder_url: &str, live_config: GenericConfiguration, config_update_retry_predicate: HttpRetryPredicate, ) -> TransactionForwarder { // The HTTP client builder requires the process-wide TLS crypto provider to be initialized, even when the // forwarder is pointed at a plain HTTP endpoint. @@ -1296,7 +1364,8 @@ app.datadoghq.com: [key-a, key-b] // populated. We are talking to a plain HTTP endpoint anyway, so disable validation to skip that path. "skip_ssl_validation": true, }); - let forwarder_config = forwarder_config_from_value(value); + let mut forwarder_config = forwarder_config_from_value(value); + forwarder_config.with_config_update_retry_predicate(live_config.clone(), config_update_retry_predicate); let context = ComponentContext::forwarder(ComponentId::try_from("test_forwarder").expect("component ID should be valid")); let metrics_builder = MetricsBuilder::from_component_context(&context); @@ -1305,7 +1374,7 @@ app.datadoghq.com: [key-a, key-b] TransactionForwarder::::from_config( context, forwarder_config, - live_config, + Some(live_config), |_uri: &Uri| None, telemetry, metrics_builder, @@ -1351,7 +1420,7 @@ app.datadoghq.com: [key-a, key-b] // at least one retry to observe the second request. let (server_url, counter) = start_recording_http_server(vec![StatusCode::FORBIDDEN, StatusCode::OK]).await; let live_config = config_with(json!({ "secret_backend_command": "/bin/true" })).await; - let forwarder = build_test_forwarder(&server_url, Some(live_config)); + let forwarder = build_test_forwarder(&server_url, live_config, Arc::new(|_| true)); let handle = forwarder.spawn().await; handle @@ -1368,4 +1437,77 @@ app.datadoghq.com: [key-a, key-b] observed ); } + + #[tokio::test] + async fn forwarder_requests_config_update_and_retries_with_rotated_api_key() { + let (server_url, mut api_key_rx) = start_api_key_recording_http_server("new-api-key").await; + let (live_config, sender) = ConfigurationLoader::for_tests(None, None, true).await; + let sender = sender.expect("dynamic configuration sender should be present"); + + sender + .send(ConfigUpdate::Snapshot(json!({ + "api_key": "old-api-key", + "secret_backend_command": "/bin/true", + }))) + .await + .expect("should send initial snapshot"); + live_config.ready().await; + + let (refresh_tx, mut refresh_rx) = mpsc::channel(4); + let forwarder = build_test_forwarder( + &server_url, + live_config.clone(), + Arc::new(move |_| { + let _ = refresh_tx.try_send(()); + true + }), + ); + + let handle = forwarder.spawn().await; + handle + .send_transaction(build_test_transaction()) + .await + .expect("send should succeed"); + + let first_api_key = timeout(Duration::from_secs(2), api_key_rx.recv()) + .await + .expect("timed out waiting for first request") + .expect("server should still be running"); + assert_eq!(first_api_key, "old-api-key"); + + timeout(Duration::from_secs(2), refresh_rx.recv()) + .await + .expect("timed out waiting for config update request signal") + .expect("refresh signal channel should still be open"); + + let mut watcher = live_config.watch_for_updates("api_key"); + sender + .send(ConfigUpdate::Partial { + key: "api_key".to_string(), + value: json!("new-api-key"), + }) + .await + .expect("should send rotated API key"); + timeout(Duration::from_secs(2), watcher.changed::()) + .await + .expect("timed out waiting for rotated API key"); + + let observed_new_key = async { + while let Some(api_key) = api_key_rx.recv().await { + if api_key == "new-api-key" { + return true; + } + } + + false + }; + assert!( + timeout(Duration::from_secs(3), observed_new_key) + .await + .expect("timed out waiting for retry with rotated API key"), + "forwarder should retry queued transaction with rotated API key" + ); + + handle.shutdown().await; + } } diff --git a/lib/saluki-components/src/common/datadog/retry.rs b/lib/saluki-components/src/common/datadog/retry.rs index fbbd9adcc59..50e84f402ee 100644 --- a/lib/saluki-components/src/common/datadog/retry.rs +++ b/lib/saluki-components/src/common/datadog/retry.rs @@ -5,7 +5,7 @@ use std::{ }; use facet::Facet; -use http::StatusCode; +use http::{Response, StatusCode}; use saluki_config::GenericConfiguration; use saluki_io::net::util::retry::{ DefaultHttpRetryPolicy, ExponentialBackoff, HttpRetryPredicate, StandardHttpClassifier, @@ -130,6 +130,11 @@ pub struct RetryConfiguration { rename = "forwarder_storage_max_disk_ratio" )] storage_max_disk_ratio: f64, + + /// Retry predicate to decide if a particular transaction will be retried. + #[serde(skip)] + #[facet(opaque)] + retry_predicate: Option, } impl RetryConfiguration { @@ -180,28 +185,24 @@ impl RetryConfiguration { self.storage_max_disk_ratio } + pub fn with_retry_predicate(&mut self, predicate: HttpRetryPredicate) { + self.retry_predicate = Some(RetryPredicate::new(predicate)); + } + /// Creates a new [`DefaultHttpRetryPolicy`] based on the forwarder configuration. /// - /// If a [`GenericConfiguration`] is supplied, the policy captures it and checks whether - /// secrets management is active on every 403 Forbidden response. This allows the retry gate to - /// pick up runtime changes pushed via the config stream without rebuilding the service. When no - /// configuration is supplied, 403 responses retain their default non-retriable behavior. - pub fn to_default_http_retry_policy( - &self, live_config: Option, - ) -> DefaultHttpRetryPolicy { + /// When a retry predicate is configured, it is added to the standard HTTP classifier. + pub fn to_default_http_retry_policy(&self) -> DefaultHttpRetryPolicy { let retry_backoff = ExponentialBackoff::with_jitter( Duration::from_secs_f64(self.backoff_base), Duration::from_secs_f64(self.backoff_max), self.backoff_factor, ); - let classifier = if let Some(config) = live_config { - let gate: HttpRetryPredicate = - Arc::new(move |response| response.status() == StatusCode::FORBIDDEN && secrets_in_use(&config)); - StandardHttpClassifier::new().with_predicate(gate) - } else { - StandardHttpClassifier::new() - }; + let mut classifier = StandardHttpClassifier::new(); + if let Some(predicate) = self.retry_predicate.clone() { + classifier = classifier.with_predicate(predicate.adapt()); + } let recovery_error_decrease_factor = (!self.recovery_reset).then_some(self.recovery_error_decrease_factor); DefaultHttpRetryPolicy::with_backoff_and_classifier(retry_backoff, classifier) @@ -209,14 +210,64 @@ impl RetryConfiguration { } } -fn secrets_in_use(config: &GenericConfiguration) -> bool { +#[derive(Clone)] +struct RetryPredicate { + predicate: HttpRetryPredicate, +} + +impl RetryPredicate { + fn new(predicate: HttpRetryPredicate) -> Self { + Self { predicate } + } + + fn adapt(self) -> HttpRetryPredicate { + Arc::new(move |response| { + let mut unit_response = Response::new(()); + *unit_response.status_mut() = response.status(); + *unit_response.version_mut() = response.version(); + *unit_response.headers_mut() = response.headers().clone(); + + (self.predicate)(&unit_response) + }) + } +} + +#[cfg(test)] +impl std::fmt::Debug for RetryPredicate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("RetryPredicate").finish_non_exhaustive() + } +} + +#[cfg(test)] +impl PartialEq for RetryPredicate { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.predicate, &other.predicate) + } +} + +pub(super) fn secrets_in_use(config: &GenericConfiguration) -> bool { matches!(config.try_get_typed::("secret_refresh_on_api_key_failure_interval"), Ok(Some(value)) if value > 0) || matches!(config.try_get_typed::("secret_backend_command"), Ok(Some(value)) if !value.trim().is_empty()) } +pub(super) fn retryable_forbidden_predicate_for_config( + live_config: GenericConfiguration, retryable_forbidden_predicate: HttpRetryPredicate, +) -> HttpRetryPredicate { + Arc::new(move |response| { + if response.status() != StatusCode::FORBIDDEN || !secrets_in_use(&live_config) { + return false; + } + + retryable_forbidden_predicate(response) + }) +} + #[cfg(test)] mod tests { - use http::{Request, Response}; + use std::sync::Arc; + + use http::{Request, Response, StatusCode}; use saluki_config::ConfigurationLoader; use serde_json::json; use tower::retry::Policy; @@ -340,84 +391,140 @@ mod tests { } #[tokio::test] - async fn policy_without_config_does_not_retry_403() { - let retry_config = test_retry_config(); - let mut policy = retry_config.to_default_http_retry_policy(None); - - assert!(!would_retry(&mut policy, ok_response(StatusCode::FORBIDDEN))); - } - - #[tokio::test] - async fn policy_with_config_but_no_secrets_does_not_retry_403() { - let (config, _) = ConfigurationLoader::for_tests(None, None, false).await; + async fn policy_without_predicate_does_not_retry_403() { let retry_config = test_retry_config(); - let mut policy = retry_config.to_default_http_retry_policy(Some(config)); + let mut policy = retry_config.to_default_http_retry_policy(); assert!(!would_retry(&mut policy, ok_response(StatusCode::FORBIDDEN))); } #[tokio::test] - async fn policy_with_secrets_retries_403() { - let values = json!({ "secret_backend_command": "/bin/true" }); - let (config, _) = ConfigurationLoader::for_tests(Some(values), None, false).await; - let retry_config = test_retry_config(); - let mut policy = retry_config.to_default_http_retry_policy(Some(config)); + async fn policy_predicate_adds_retry_for_403() { + let mut retry_config = test_retry_config(); + let predicate: HttpRetryPredicate = Arc::new(|response| response.status() == StatusCode::FORBIDDEN); + retry_config.with_retry_predicate(predicate); + let mut policy = retry_config.to_default_http_retry_policy(); assert!(would_retry(&mut policy, ok_response(StatusCode::FORBIDDEN))); } #[tokio::test] - async fn policy_secrets_does_not_affect_other_status_codes() { - let values = json!({ "secret_backend_command": "/bin/true" }); - let (config, _) = ConfigurationLoader::for_tests(Some(values), None, false).await; - let retry_config = test_retry_config(); - let mut policy = retry_config.to_default_http_retry_policy(Some(config)); + async fn policy_predicate_keeps_default_behavior_for_other_status_codes() { + let mut retry_config = test_retry_config(); + let predicate: HttpRetryPredicate = Arc::new(|response| response.status() == StatusCode::FORBIDDEN); + retry_config.with_retry_predicate(predicate); + let mut policy = retry_config.to_default_http_retry_policy(); assert!(!would_retry(&mut policy, ok_response(StatusCode::OK))); assert!(!would_retry(&mut policy, ok_response(StatusCode::BAD_REQUEST))); assert!(!would_retry(&mut policy, ok_response(StatusCode::UNAUTHORIZED))); assert!(!would_retry(&mut policy, ok_response(StatusCode::PAYLOAD_TOO_LARGE))); + assert!(would_retry(&mut policy, ok_response(StatusCode::FORBIDDEN))); assert!(would_retry(&mut policy, ok_response(StatusCode::INTERNAL_SERVER_ERROR))); assert!(would_retry(&mut policy, ok_response(StatusCode::TOO_MANY_REQUESTS))); } #[tokio::test] - async fn policy_403_gate_reflects_dynamic_secrets_config_change() { - use std::time::Duration as StdDuration; + async fn policy_predicate_is_evaluated_for_403() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + let mut retry_config = test_retry_config(); + let calls = Arc::new(AtomicUsize::new(0)); + let calls_clone = Arc::clone(&calls); + let predicate: HttpRetryPredicate = Arc::new(move |_| { + calls_clone.fetch_add(1, Ordering::SeqCst); + true + }); + retry_config.with_retry_predicate(predicate); + let mut policy = retry_config.to_default_http_retry_policy(); - use saluki_config::dynamic::ConfigUpdate; + assert!(would_retry(&mut policy, ok_response(StatusCode::FORBIDDEN))); + assert_eq!(calls.load(Ordering::SeqCst), 1); + } - let (config, sender) = ConfigurationLoader::for_tests(Some(json!({})), None, true).await; - let sender = sender.expect("dynamic configuration sender should be present"); + #[tokio::test] + async fn retryable_forbidden_predicate_calls_inner_when_secrets_are_in_use() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + let (config, _) = + ConfigurationLoader::for_tests(Some(json!({ "secret_backend_command": "/bin/true" })), None, false).await; + let mut retry_config = test_retry_config(); + let calls = Arc::new(AtomicUsize::new(0)); + let calls_clone = Arc::clone(&calls); + retry_config.with_retry_predicate(retryable_forbidden_predicate_for_config( + config, + Arc::new(move |_| { + calls_clone.fetch_add(1, Ordering::SeqCst); + true + }), + )); + let mut policy = retry_config.to_default_http_retry_policy(); - // Apply an empty initial snapshot and wait for readiness. - sender - .send(ConfigUpdate::Snapshot(json!({}))) - .await - .expect("should send initial snapshot"); - config.ready().await; + assert!(would_retry(&mut policy, ok_response(StatusCode::FORBIDDEN))); + assert_eq!(calls.load(Ordering::SeqCst), 1); + } - let retry_config = test_retry_config(); - let mut policy = retry_config.to_default_http_retry_policy(Some(config.clone())); + #[tokio::test] + async fn retryable_forbidden_predicate_does_not_call_inner_without_secrets() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + let (config, _) = ConfigurationLoader::for_tests(None, None, false).await; + let mut retry_config = test_retry_config(); + let calls = Arc::new(AtomicUsize::new(0)); + let calls_clone = Arc::clone(&calls); + retry_config.with_retry_predicate(retryable_forbidden_predicate_for_config( + config, + Arc::new(move |_| { + calls_clone.fetch_add(1, Ordering::SeqCst); + true + }), + )); + let mut policy = retry_config.to_default_http_retry_policy(); - // Before secrets are configured, 403 must not be retried. assert!(!would_retry(&mut policy, ok_response(StatusCode::FORBIDDEN))); + assert_eq!(calls.load(Ordering::SeqCst), 0); + } - // Push a config update that enables secrets management. - let mut watcher = config.watch_for_updates("secret_backend_command"); - sender - .send(ConfigUpdate::Partial { - key: "secret_backend_command".to_string(), - value: json!("/bin/true"), - }) - .await - .expect("should send partial update"); - - tokio::time::timeout(StdDuration::from_secs(2), watcher.changed::()) - .await - .expect("timed out waiting for secret_backend_command update"); - - // The same policy instance must now retry 403 because the predicate reads the live cached secrets flag. - assert!(would_retry(&mut policy, ok_response(StatusCode::FORBIDDEN))); + #[tokio::test] + async fn retryable_forbidden_predicate_does_not_call_inner_for_other_retryable_statuses() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + let (config, _) = + ConfigurationLoader::for_tests(Some(json!({ "secret_backend_command": "/bin/true" })), None, false).await; + let mut retry_config = test_retry_config(); + let calls = Arc::new(AtomicUsize::new(0)); + let calls_clone = Arc::clone(&calls); + retry_config.with_retry_predicate(retryable_forbidden_predicate_for_config( + config, + Arc::new(move |_| { + calls_clone.fetch_add(1, Ordering::SeqCst); + true + }), + )); + let mut policy = retry_config.to_default_http_retry_policy(); + + assert!(would_retry(&mut policy, ok_response(StatusCode::INTERNAL_SERVER_ERROR))); + assert_eq!(calls.load(Ordering::SeqCst), 0); + } + + #[tokio::test] + async fn secrets_in_use_detects_secret_backend_command() { + let (config, _) = ConfigurationLoader::for_tests(None, None, false).await; + assert!(!secrets_in_use(&config)); + + let (config, _) = + ConfigurationLoader::for_tests(Some(json!({ "secret_backend_command": "/bin/true" })), None, false).await; + assert!(secrets_in_use(&config)); + } + + #[tokio::test] + async fn secrets_in_use_detects_refresh_interval() { + let (config, _) = ConfigurationLoader::for_tests( + Some(json!({ "secret_refresh_on_api_key_failure_interval": 1u64 })), + None, + false, + ) + .await; + assert!(secrets_in_use(&config)); } } diff --git a/lib/saluki-components/src/forwarders/datadog/mod.rs b/lib/saluki-components/src/forwarders/datadog/mod.rs index b60943bcf23..dda658e528c 100644 --- a/lib/saluki-components/src/forwarders/datadog/mod.rs +++ b/lib/saluki-components/src/forwarders/datadog/mod.rs @@ -9,6 +9,7 @@ use saluki_core::{ observability::ComponentMetricsExt as _, }; use saluki_error::GenericError; +use saluki_io::net::util::retry::HttpRetryPredicate; use saluki_metrics::MetricsBuilder; use stringtheory::MetaString; use tokio::select; @@ -69,6 +70,15 @@ impl DatadogConfiguration { self } + + /// Configures the predicate used to request Core Agent configuration updates for retryable 403 responses. + pub fn with_config_update_retry_predicate(mut self, predicate: HttpRetryPredicate) -> Self { + if let Some(config) = self.configuration.clone() { + self.forwarder_config + .with_config_update_retry_predicate(config, predicate); + } + self + } } #[async_trait]