Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 250 additions & 9 deletions lib/saluki-components/src/common/datadog/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ where
/// Transaction forwarder for Datadog endpoints.
pub struct TransactionForwarder<B> {
context: ComponentContext,
config: ForwarderConfiguration,
config: ForwarderConfiguration, // static snapshot of forwarder settings
live_config: Option<GenericConfiguration>, // runtime-mutable configuration
telemetry: ComponentTelemetry,
metrics_builder: MetricsBuilder,
client: HttpClient,
Expand Down Expand Up @@ -141,13 +142,13 @@ where
{
/// Creates a new `TransactionForwarder` instance from the given configuration.
pub fn from_config<F>(
context: ComponentContext, config: ForwarderConfiguration, configuration: Option<GenericConfiguration>,
context: ComponentContext, config: ForwarderConfiguration, live_config: Option<GenericConfiguration>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the is_secrets_in_use question (retry.rs:219) resolves to a bool, this parameter goes away and the threading through run_io_loop / run_endpoint_io_loop goes with it.

endpoint_name: F, telemetry: ComponentTelemetry, metrics_builder: MetricsBuilder,
) -> Result<Self, GenericError>
where
F: Fn(&Uri) -> Option<MetaString> + Send + Sync + 'static,
{
let endpoints = config.endpoint().build_resolved_endpoints(configuration)?;
let endpoints = config.endpoint().build_resolved_endpoints(live_config.clone())?;
let mut client_builder = HttpClient::builder()
.with_request_timeout(config.request_timeout())
.with_bytes_sent_counter(telemetry.bytes_sent().clone())
Expand All @@ -167,6 +168,7 @@ where
Ok(Self {
context,
config,
live_config,
telemetry,
metrics_builder,
client,
Expand All @@ -186,6 +188,7 @@ where
let Self {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a request to fix in this PR, but this destructure is where the too_many_arguments warnings (suppressed a few lines down) seem to originate. All these fields are owned by self, destructured here, and then passed individually through run_io_loop and run_endpoint_io_loop.

Perhaps self could be left in-tact here and then passed through as a single argument. Not sure.

Suggested change
let Self {
// TODO: do not destructure self as a way to fix the #[allow(clippy::too_many_arguments)] annotations
let Self {

context,
config,
live_config,
telemetry,
metrics_builder,
client,
Expand All @@ -200,6 +203,7 @@ where
io_shutdown_tx,
context,
config,
live_config,
client,
telemetry,
metrics_builder,
Expand All @@ -214,10 +218,12 @@ where
}
}

#[allow(clippy::too_many_arguments)]
async fn run_io_loop<B>(
mut transactions_rx: mpsc::Receiver<Transaction<B>>, io_shutdown_tx: oneshot::Sender<()>,
context: ComponentContext, config: ForwarderConfiguration, service: HttpClient, telemetry: ComponentTelemetry,
metrics_builder: MetricsBuilder, resolved_endpoints: Vec<ResolvedEndpoint>,
context: ComponentContext, config: ForwarderConfiguration, live_config: Option<GenericConfiguration>,
service: HttpClient, telemetry: ComponentTelemetry, metrics_builder: MetricsBuilder,
resolved_endpoints: Vec<ResolvedEndpoint>,
) where
B: Body + Buf + Clone + Send + Sync + 'static,
B::Data: Send,
Expand All @@ -243,6 +249,7 @@ async fn run_io_loop<B>(
task_barrier,
context.clone(),
config.clone(),
live_config.clone(),
service.clone(),
telemetry.clone(),
txnq_telemetry,
Expand Down Expand Up @@ -279,10 +286,11 @@ async fn run_io_loop<B>(
let _ = io_shutdown_tx.send(());
}

#[allow(clippy::too_many_arguments)]
async fn run_endpoint_io_loop<B>(
mut txns_rx: mpsc::Receiver<Transaction<B>>, task_barrier: Arc<Barrier>, context: ComponentContext,
config: ForwarderConfiguration, service: HttpClient, telemetry: ComponentTelemetry,
txnq_telemetry: TransactionQueueTelemetry, endpoint: ResolvedEndpoint,
config: ForwarderConfiguration, live_config: Option<GenericConfiguration>, service: HttpClient,
telemetry: ComponentTelemetry, txnq_telemetry: TransactionQueueTelemetry, endpoint: ResolvedEndpoint,
) where
B: Body + Buf + Clone + Send + Sync + 'static,
B::Data: Send,
Expand Down Expand Up @@ -311,7 +319,7 @@ async fn run_endpoint_io_loop<B>(
.map_request(with_version_info())
.concurrency_limit(config.endpoint_concurrency())
.layer(RetryCircuitBreakerLayer::new(
config.retry().to_default_http_retry_policy(),
config.retry().to_default_http_retry_policy(live_config.clone()),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Limit 403 retries to endpoints with refreshable keys

When additional_endpoints are configured and secrets are enabled, this installs the 403 retry override for every endpoint I/O loop. However, EndpointConfiguration::build_resolved_endpoints only attaches the live GenericConfiguration to the primary endpoint; additional endpoints are built as static ResolvedEndpoints, so their API keys cannot be refreshed by ResolvedEndpoint::api_key(). A permanent 403 from a secondary endpoint will therefore be re-enqueued and retried until queue limits are hit instead of being treated as a non-retriable auth error, consuming retry queue/disk capacity and potentially delaying or dropping other transactions.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I will follow-up on that as a part of #1540

))
.map_request(|req: Request<TransactionBody<B>>| req.map(into_client_body))
.service(service);
Expand Down Expand Up @@ -646,23 +654,33 @@ impl<T: Retryable> PendingTransactions<T> {

#[cfg(test)]
mod tests {
use std::sync::{Arc, OnceLock};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, OnceLock,
};

use bytes::Bytes;
use http::StatusCode;
use http_body_util::Empty;
use rcgen::{generate_simple_self_signed, CertifiedKey};
use rustls::{
pki_types::{PrivateKeyDer, PrivatePkcs8KeyDer},
RootCertStore, ServerConfig,
};
use saluki_common::buf::FrozenChunkedBytesBuffer;
use saluki_config::ConfigurationLoader;
use saluki_core::{observability::ComponentMetricsExt as _, topology::ComponentId};
use serde_json::json;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
sync::mpsc,
time::{timeout, Duration},
};
use tokio_rustls::TlsAcceptor;

use super::*;
use crate::common::datadog::transaction::{Metadata as TxnMetadata, Transaction};

fn forwarder_config_from_value(value: serde_json::Value) -> ForwarderConfiguration {
serde_json::from_value(value).expect("ForwarderConfiguration should deserialize")
Expand Down Expand Up @@ -809,4 +827,227 @@ mod tests {
.expect("HTTPS request channel closed");
assert!(received_request.starts_with("GET / HTTP/1.1"));
}

/// Mode controlling what status codes the recording HTTP server returns to incoming requests.
enum ServerMode {
/// Always respond with the given status code.
AlwaysStatus(StatusCode),
/// Respond with each status code from the sequence in turn; once exhausted, respond with the final code forever.
StatusSequence(Vec<StatusCode>),
}

/// Starts a minimal HTTP server on `127.0.0.1:0` that records each request and replies based on `mode`.
///
/// Returns the server's `http://127.0.0.1:PORT/` URL and a counter that increments once per accepted/processed
/// connection (one connection per request, since the server replies with `Connection: close`).
async fn start_recording_http_server(mode: ServerMode) -> (String, Arc<AtomicUsize>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let counter = Arc::new(AtomicUsize::new(0));

let mode = Arc::new(mode);
let counter_for_task = Arc::clone(&counter);
tokio::spawn(async move {
loop {
let (mut stream, _) = match listener.accept().await {
Ok(pair) => pair,
Err(_) => return,
};
let mode = Arc::clone(&mode);
let counter = Arc::clone(&counter_for_task);

tokio::spawn(async move {
let mut request = Vec::new();
let mut buf = [0u8; 1024];
loop {
match stream.read(&mut buf).await {
Ok(0) => return,
Ok(n) => {
request.extend_from_slice(&buf[..n]);
if request.windows(4).any(|window| window == b"\r\n\r\n") {
break;
}
}
Err(_) => return,
}
}

// Drain any body bytes that arrived alongside the headers, plus whatever remains based on a
// simple Content-Length parse. We don't actually need to buffer it; we just need to consume it
// so the client doesn't get a connection reset before reading our response.
let request_str = String::from_utf8_lossy(&request).into_owned();
let content_length = parse_content_length(&request_str).unwrap_or(0);
let header_end = request
.windows(4)
.position(|w| w == b"\r\n\r\n")
.map_or(request.len(), |idx| idx + 4);
let mut already_read_body = request.len().saturating_sub(header_end);
while already_read_body < content_length {
match stream.read(&mut buf).await {
Ok(0) => break,
Ok(n) => already_read_body += n,
Err(_) => return,
}
}

let nth = counter.fetch_add(1, Ordering::SeqCst);
let status = match mode.as_ref() {
ServerMode::AlwaysStatus(s) => *s,
ServerMode::StatusSequence(seq) => {
let idx = nth.min(seq.len() - 1);
seq[idx]
}
};

let response = format!(
"HTTP/1.1 {} {}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
status.as_u16(),
status.canonical_reason().unwrap_or(""),
);
let _ = stream.write_all(response.as_bytes()).await;
let _ = stream.shutdown().await;
});
}
});

(format!("http://127.0.0.1:{port}/"), counter)
}

fn parse_content_length(request: &str) -> Option<usize> {
for line in request.lines() {
if let Some(value) = line
.strip_prefix("Content-Length:")
.or_else(|| line.strip_prefix("content-length:"))
{
return value.trim().parse().ok();
}
}
None
}

fn build_test_forwarder(
forwarder_url: &str, live_config: Option<GenericConfiguration>,
) -> TransactionForwarder<FrozenChunkedBytesBuffer> {
// The HTTP client builder requires the process-wide TLS crypto provider to be initialized, even when the
// forwarder is pointed at a plain HTTP endpoint.
init_tls_crypto_provider();

// Tight timeouts and small backoffs keep the test under a couple seconds even with retries.
let value = serde_json::json!({
"api_key": "test-api-key",
"dd_url": forwarder_url,
"forwarder_timeout": 1u64,
"forwarder_num_workers": 1usize,
"forwarder_high_prio_buffer_size": 4usize,
"forwarder_backoff_base": 0.001,
"forwarder_backoff_max": 0.01,
"forwarder_backoff_factor": 2.0,
"forwarder_recovery_interval": 1u32,
"forwarder_recovery_reset": false,
// The HTTP client builder otherwise requires the process-wide default root certificate store to be
// populated. We are talking to a plain HTTP endpoint anyway, so disable validation to skip that path.
"skip_ssl_validation": true,
});
let forwarder_config = forwarder_config_from_value(value);
let context =
ComponentContext::forwarder(ComponentId::try_from("test_forwarder").expect("component ID should be valid"));
let metrics_builder = MetricsBuilder::from_component_context(&context);
let telemetry = ComponentTelemetry::from_builder(&metrics_builder);

TransactionForwarder::<FrozenChunkedBytesBuffer>::from_config(
context,
forwarder_config,
live_config,
|_uri: &Uri| None,
telemetry,
metrics_builder,
)
.expect("forwarder should build")
}

fn build_test_transaction() -> Transaction<FrozenChunkedBytesBuffer> {
let body = FrozenChunkedBytesBuffer::from(Bytes::from_static(b"test-payload"));
let request = http::Request::builder()
.method("POST")
// The endpoint middleware rewrites the authority to point at our `dd_url`, but preserves the path. Use a
// path that is not the special-cased `/api/v2/logs` or `/api/v0.2/{traces,stats}` routes, so the request
// is dispatched to the configured `dd_url` host directly.
.uri("http://placeholder/api/v2/series")
.body(body)
.expect("request should build");
Transaction::from_original(TxnMetadata::from_event_count(1), request)
}

async fn config_with(values: serde_json::Value) -> GenericConfiguration {
let (config, _) = ConfigurationLoader::for_tests(Some(values), None, false).await;
config
}

async fn wait_for_count_at_least(counter: &Arc<AtomicUsize>, target: usize, deadline: Duration) -> usize {
let start = std::time::Instant::now();
loop {
let current = counter.load(Ordering::SeqCst);
if current >= target {
return current;
}
if start.elapsed() > deadline {
return current;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}

#[tokio::test]
async fn forwarder_does_not_retry_403_without_secrets() {
let (server_url, counter) = start_recording_http_server(ServerMode::AlwaysStatus(StatusCode::FORBIDDEN)).await;
let live_config = config_with(json!({})).await;
let forwarder = build_test_forwarder(&server_url, Some(live_config));

let handle = forwarder.spawn().await;
handle
.send_transaction(build_test_transaction())
.await
.expect("send should succeed");

// Wait for the first 403 to be observed, then give the forwarder a generous window to perform any
// (incorrect) additional retries.
let first = wait_for_count_at_least(&counter, 1, Duration::from_secs(2)).await;
assert!(first >= 1, "server should have received at least one request");
tokio::time::sleep(Duration::from_millis(200)).await;

// Tear down before asserting so any pending in-flight call is drained.
handle.shutdown().await;

let final_count = counter.load(Ordering::SeqCst);
assert_eq!(
final_count, 1,
"without secrets configured, 403 must not be retried (saw {} requests)",
final_count
);
}

#[tokio::test]
async fn forwarder_retries_403_when_secrets_in_use() {
// The server returns 403 to the first request and 200 to every subsequent request; the forwarder must drive
// at least one retry to observe the second request.
let (server_url, counter) =
start_recording_http_server(ServerMode::StatusSequence(vec![StatusCode::FORBIDDEN, StatusCode::OK])).await;
let live_config = config_with(json!({ "secret_backend_command": "/bin/true" })).await;
let forwarder = build_test_forwarder(&server_url, Some(live_config));

let handle = forwarder.spawn().await;
handle
.send_transaction(build_test_transaction())
.await
.expect("send should succeed");

let observed = wait_for_count_at_least(&counter, 2, Duration::from_secs(3)).await;
handle.shutdown().await;

assert!(
observed >= 2,
"with secrets configured, 403 must be retried at least once (saw {} requests)",
observed
);
}
}
Loading
Loading