Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[workspace]
resolver = "2"
resolver = "3"

members = ["agent-control", "resource-detection", "fs", "wrapper_with_default"]

[workspace.package]
authors = ["The New Relic Agent Control Team"]
edition = "2021"
edition = "2024"
publish = false
rust-version = "1.85.0"
license-file = "./LICENSE.md"
Expand Down Expand Up @@ -49,4 +49,4 @@ reqwest = { version = "0.12.11", default-features = false, features = [
strip = "debuginfo"

[workspace.metadata.cargo-shear]
ignored = ["mockall_double"]
ignored = ["mockall_double"]
41 changes: 24 additions & 17 deletions agent-control/src/agent_control/agent_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::agent_control::config_validator::DynamicConfigValidator;
use crate::agent_control::error::AgentError;
use crate::agent_control::uptime_report::UptimeReporter;
use crate::event::{
channel::{EventConsumer, EventPublisher},
AgentControlEvent, ApplicationEvent, OpAMPEvent, SubAgentEvent,
channel::{EventConsumer, EventPublisher},
};
use crate::opamp::remote_config::report::OpampRemoteConfigStatus;
use crate::opamp::{
Expand Down Expand Up @@ -436,18 +436,18 @@ fn sub_agents_difference<'a>(

#[cfg(test)]
mod tests {
use crate::agent_control::AgentControl;
use crate::agent_control::agent_control::sub_agents_difference;
use crate::agent_control::agent_id::AgentID;
use crate::agent_control::config::{
AgentControlConfig, AgentControlDynamicConfig, SubAgentConfig,
};
use crate::agent_control::config_storer::loader_storer::tests::MockAgentControlDynamicConfigStore;
use crate::agent_control::config_validator::tests::MockDynamicConfigValidator;
use crate::agent_control::config_validator::DynamicConfigValidatorError;
use crate::agent_control::config_validator::tests::MockDynamicConfigValidator;
use crate::agent_control::resource_cleaner::ResourceCleanerError;
use crate::agent_control::resource_cleaner::no_op::NoOpResourceCleaner;
use crate::agent_control::resource_cleaner::tests::MockResourceCleaner;
use crate::agent_control::resource_cleaner::ResourceCleanerError;
use crate::agent_control::AgentControl;
use crate::agent_type::agent_type_id::AgentTypeID;
use crate::agent_type::agent_type_registry::AgentRepositoryError;
use crate::event::channel::pub_sub;
Expand All @@ -460,7 +460,7 @@ mod tests {
use crate::sub_agent::health::health_checker::{Healthy, Unhealthy};
use crate::sub_agent::tests::MockStartedSubAgent;
use crate::sub_agent::tests::MockSubAgentBuilder;
use mockall::{predicate, Sequence};
use mockall::{Sequence, predicate};
use std::collections::HashMap;
use std::sync::Arc;
use std::thread::{sleep, spawn};
Expand Down Expand Up @@ -1113,9 +1113,11 @@ agents:
)]))),
);

assert!(agent_control
.apply_remote_agent_control_config(&remote_config, &mut running_sub_agents)
.is_err());
assert!(
agent_control
.apply_remote_agent_control_config(&remote_config, &mut running_sub_agents)
.is_err()
);

assert_eq!(running_sub_agents.len(), 1);

Expand Down Expand Up @@ -1380,9 +1382,12 @@ agents:
let ev = agent_control_consumer.as_ref().recv().unwrap();
assert_eq!(expected, ev);

let expected = AgentControlEvent::AgentControlBecameUnhealthy(Unhealthy::new(String::default(), String::from(
"Error applying Agent Control remote config: remote config error: `config hash: `a-hash` config error: `some error message``",
)));
let expected = AgentControlEvent::AgentControlBecameUnhealthy(Unhealthy::new(
String::default(),
String::from(
"Error applying Agent Control remote config: remote config error: `config hash: `a-hash` config error: `some error message``",
),
));
let ev = agent_control_consumer.as_ref().recv().unwrap();
assert_eq!(expected, ev);
}
Expand Down Expand Up @@ -1647,12 +1652,14 @@ agents:
let diff: Vec<_> = sub_agents_difference(&old_sub_agents, &new_sub_agents).collect();

assert_eq!(diff.len(), 2);
assert!(diff
.iter()
.any(|(id, _)| id == &&AgentID::new("infra-agent").unwrap()));
assert!(diff
.iter()
.any(|(id, _)| id == &&AgentID::new("nrdot").unwrap()));
assert!(
diff.iter()
.any(|(id, _)| id == &&AgentID::new("infra-agent").unwrap())
);
assert!(
diff.iter()
.any(|(id, _)| id == &&AgentID::new("nrdot").unwrap())
);
}

#[test]
Expand Down
4 changes: 3 additions & 1 deletion agent-control/src/agent_control/agent_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ pub struct AgentID(String);

#[derive(Error, Debug)]
pub enum AgentIDError {
#[error("AgentID must contain 32 characters at most, contain lowercase alphanumeric characters or dashes only, start with alphabetic, and end with alphanumeric")]
#[error(
"AgentID must contain 32 characters at most, contain lowercase alphanumeric characters or dashes only, start with alphabetic, and end with alphanumeric"
)]
InvalidFormat,
#[error("AgentID '{0}' is reserved")]
Reserved(String),
Expand Down
32 changes: 19 additions & 13 deletions agent-control/src/agent_control/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use super::uptime_report::UptimeReportConfig;
use crate::http::config::ProxyConfig;
use crate::instrumentation::config::logs::config::LoggingConfig;
use crate::opamp::auth::config::AuthConfig;
use crate::opamp::remote_config::validators::signature::validator::SignatureValidatorConfig;
use crate::opamp::remote_config::RemoteConfigError;
use crate::opamp::remote_config::validators::signature::validator::SignatureValidatorConfig;
use crate::values::yaml_config::YAMLConfig;
use crate::{
agent_type::agent_type_id::AgentTypeID, instrumentation::config::InstrumentationConfig,
Expand Down Expand Up @@ -390,10 +390,12 @@ agents: {}
serde_yaml::from_str::<AgentControlDynamicConfig>(EXAMPLE_SUBAGENTS_CONFIG).is_ok()
);
assert!(serde_yaml::from_str::<AgentControlDynamicConfig>(EXAMPLE_K8S_CONFIG).is_ok());
assert!(serde_yaml::from_str::<AgentControlDynamicConfig>(
EXAMPLE_AGENTCONTROL_CONFIG_EMPTY_AGENTS
)
.is_ok());
assert!(
serde_yaml::from_str::<AgentControlDynamicConfig>(
EXAMPLE_AGENTCONTROL_CONFIG_EMPTY_AGENTS
)
.is_ok()
);
assert!(
serde_yaml::from_str::<AgentControlConfig>(EXAMPLE_AGENTCONTROL_CONFIG_NO_AGENTS)
.is_err()
Expand All @@ -418,21 +420,25 @@ agents: {}
let actual =
serde_yaml::from_str::<AgentControlConfig>(AGENTCONTROL_CONFIG_RESERVED_AGENT_ID);
assert!(actual.is_err());
assert!(actual
.unwrap_err()
.to_string()
.contains("AgentID 'agent-control' is reserved at line"))
assert!(
actual
.unwrap_err()
.to_string()
.contains("AgentID 'agent-control' is reserved at line")
)
}

#[test]
fn parse_with_missing_k8s_fields() {
let actual =
serde_yaml::from_str::<AgentControlConfig>(AGENTCONTROL_CONFIG_MISSING_K8S_FIELDS);
assert!(actual.is_err());
assert!(actual
.unwrap_err()
.to_string()
.contains("k8s: missing field"));
assert!(
actual
.unwrap_err()
.to_string()
.contains("k8s: missing field")
);
}

#[test]
Expand Down
14 changes: 9 additions & 5 deletions agent-control/src/agent_control/config_storer/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::agent_control::config_storer::loader_storer::{
AgentControlConfigLoader, AgentControlDynamicConfigDeleter, AgentControlDynamicConfigLoader,
AgentControlDynamicConfigStorer,
};
use crate::agent_control::defaults::{default_capabilities, AGENT_CONTROL_CONFIG_ENV_VAR_PREFIX};
use crate::agent_control::defaults::{AGENT_CONTROL_CONFIG_ENV_VAR_PREFIX, default_capabilities};
use crate::values::yaml_config::{YAMLConfig, YAMLConfigError};
use crate::values::yaml_config_repository::{YAMLConfigRepository, YAMLConfigRepositoryError};
use config::builder::DefaultState;
Expand Down Expand Up @@ -210,7 +210,8 @@ fleet_control:
// We set the environment variable with the `__` separator which will create the nested
// configs appropriately.
let env_var_name = "NR_AC_AGENTS__ROLLDICE1__AGENT_TYPE";
env::set_var(env_var_name, "namespace/com.newrelic.infrastructure:0.0.2");
// TODO: Audit that the environment access only happens in single-threaded code.
unsafe { env::set_var(env_var_name, "namespace/com.newrelic.infrastructure:0.0.2") };

let vr = YAMLConfigRepositoryFile::new(local_dir, PathBuf::new()).with_remote();
let store = AgentControlConfigStore::new(Arc::new(vr));
Expand All @@ -236,7 +237,8 @@ fleet_control:
};

// Env cleanup
env::remove_var(env_var_name);
// TODO: Audit that the environment access only happens in single-threaded code.
unsafe { env::remove_var(env_var_name) };

assert_eq!(actual, expected);
}
Expand All @@ -258,7 +260,8 @@ agents:
// We set the environment variable with the `__` separator which will create the nested
// configs appropriately.
let env_var_name = "NR_AC_AGENTS__ROLLDICE2__AGENT_TYPE";
env::set_var(env_var_name, "namespace/com.newrelic.infrastructure:0.0.2");
// TODO: Audit that the environment access only happens in single-threaded code.
unsafe { env::set_var(env_var_name, "namespace/com.newrelic.infrastructure:0.0.2") };

let vr = YAMLConfigRepositoryFile::new(local_dir, PathBuf::new()).with_remote();
let store = AgentControlConfigStore::new(Arc::new(vr));
Expand All @@ -284,7 +287,8 @@ agents:
};

// Env cleanup
env::remove_var(env_var_name);
// TODO: Audit that the environment access only happens in single-threaded code.
unsafe { env::remove_var(env_var_name) };

assert_eq!(actual, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::sub_agent::health::health_checker::{Healthy, Unhealthy};
use crate::{
agent_control::{agent_control::AgentControl, error::AgentError},
opamp::{hash_repository::HashRepository, remote_config::RemoteConfig},
sub_agent::{collection::StartedSubAgents, NotStartedSubAgent, SubAgentBuilder},
sub_agent::{NotStartedSubAgent, SubAgentBuilder, collection::StartedSubAgents},
};
use opamp_client::StartedClient;
use tracing::{error, info};
Expand Down Expand Up @@ -82,7 +82,7 @@ mod tests {
opamp::{
client_builder::tests::MockStartedOpAMPClient,
hash_repository::repository::tests::MockHashRepository,
remote_config::{hash::Hash, ConfigurationMap, RemoteConfig},
remote_config::{ConfigurationMap, RemoteConfig, hash::Hash},
},
sub_agent::{
collection::StartedSubAgents,
Expand Down
74 changes: 38 additions & 36 deletions agent-control/src/agent_control/http_server/async_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,51 @@ pub fn run_async_sync_bridge(
sub_agent_consumer: EventConsumer<SubAgentEvent>,
stop_rx: EventConsumer<()>,
) -> JoinHandle<()> {
spawn_named_thread("Async-Sync bridge", move || loop {
select! {
recv(&agent_control_consumer.as_ref()) -> sa_event_res => {
match sa_event_res {
Ok(agent_control_event) => {
let _ = async_sa_publisher.send(agent_control_event).inspect_err(|err| {
error!(
spawn_named_thread("Async-Sync bridge", move || {
loop {
select! {
recv(&agent_control_consumer.as_ref()) -> sa_event_res => {
match sa_event_res {
Ok(agent_control_event) => {
let _ = async_sa_publisher.send(agent_control_event).inspect_err(|err| {
error!(
error_msg = %err,
"cannot forward agent control event"
);
});
}
Err(err) => {
debug!(
error_msg = %err,
"cannot forward agent control event"
"status server bridge channel closed"
);
});
break;
}
}
Err(err) => {
debug!(
error_msg = %err,
"status server bridge channel closed"
);
break;
}
}
},
recv(&sub_agent_consumer.as_ref()) -> suba_event_res => {
match suba_event_res {
Ok(sub_agent_event) => {
let _ = async_suba_publisher.send(sub_agent_event).inspect_err(|err| {
error!(
},
recv(&sub_agent_consumer.as_ref()) -> suba_event_res => {
match suba_event_res {
Ok(sub_agent_event) => {
let _ = async_suba_publisher.send(sub_agent_event).inspect_err(|err| {
error!(
error_msg = %err,
"cannot forward agent control event"
);
});
}
Err(err) => {
debug!(
error_msg = %err,
"cannot forward agent control event"
"status server bridge channel closed"
);
});
}
Err(err) => {
debug!(
error_msg = %err,
"status server bridge channel closed"
);
break;
break;
}
}
},
recv(&stop_rx.as_ref()) -> _ => {
debug!("status server bridge stopping");
break;
}
},
recv(&stop_rx.as_ref()) -> _ => {
debug!("status server bridge stopping");
break;
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion agent-control/src/agent_control/http_server/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Display for Host {
#[cfg(test)]
mod tests {
use crate::agent_control::http_server::config::{
Host, Port, ServerConfig, DEFAULT_HOST, DEFAULT_PORT,
DEFAULT_HOST, DEFAULT_PORT, Host, Port, ServerConfig,
};
use serde::Deserialize;

Expand Down
2 changes: 1 addition & 1 deletion agent-control/src/agent_control/http_server/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ mod tests {
use tracing_test::traced_test;

use crate::agent_control::http_server::config::ServerConfig;
use crate::event::channel::pub_sub;
use crate::event::AgentControlEvent;
use crate::event::channel::pub_sub;

use super::Runner;

Expand Down
10 changes: 5 additions & 5 deletions agent-control/src/agent_control/http_server/server.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use crate::agent_control::config::OpAMPClientConfig;
use crate::agent_control::http_server::config::{ServerConfig, DEFAULT_WORKERS};
use crate::agent_control::http_server::StatusServerError;
use crate::agent_control::http_server::config::{DEFAULT_WORKERS, ServerConfig};
use crate::agent_control::http_server::status::Status;
use crate::agent_control::http_server::status_handler::status_handler;
use crate::agent_control::http_server::status_updater::on_agent_control_event_update_status;
use crate::agent_control::http_server::StatusServerError;
use crate::event::{AgentControlEvent, SubAgentEvent};
use actix_web::{dev::ServerHandle, web, App, HttpServer};
use std::sync::mpsc;
use actix_web::{App, HttpServer, dev::ServerHandle, web};
use std::sync::Arc;
use std::sync::mpsc;
use tokio::runtime::Handle;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::RwLock;
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::{debug, error, info};

pub async fn run_status_server(
Expand Down
2 changes: 1 addition & 1 deletion agent-control/src/agent_control/http_server/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::sub_agent::health::health_checker::{Healthy, Unhealthy};
use crate::sub_agent::health::with_start_time::HealthWithStartTime;
use crate::sub_agent::identity::AgentIdentity;
use serde::Serialize;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::time::SystemTime;
use url::Url;

Expand Down
Loading