Skip to content

Commit 8f42784

Browse files
committed
Add "hq server debug-dump"
1 parent a17f59b commit 8f42784

11 files changed

Lines changed: 171 additions & 10 deletions

File tree

crates/hyperqueue/src/client/commands/server.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::common::serverdir::{
66
};
77
use crate::common::utils::network::get_hostname;
88
use crate::common::utils::time::parse_hms_or_human_time;
9+
use crate::rpc_call;
910
use crate::server::bootstrap::{
1011
ServerConfig, generate_server_uid, get_client_session, init_hq_server,
1112
};
@@ -68,6 +69,8 @@ pub enum ServerCommand {
6869
Info(ServerInfoOpts),
6970
/// Generate an access file without starting the server
7071
GenerateAccess(GenerateAccessOpts),
72+
/// Dump internal scheduler info into a file
73+
DebugDump(DebugDumpOpts),
7174
}
7275

7376
#[derive(Parser)]
@@ -132,12 +135,19 @@ pub struct ServerStopOpts {}
132135
#[derive(Parser)]
133136
pub struct ServerInfoOpts {}
134137

138+
#[derive(Parser)]
139+
pub struct DebugDumpOpts {
140+
/// Path where dump is stored
141+
path: PathBuf,
142+
}
143+
135144
pub async fn command_server(gsettings: &GlobalSettings, opts: ServerOpts) -> anyhow::Result<()> {
136145
match opts.subcmd {
137146
ServerCommand::Start(opts) => start_server(gsettings, opts).await,
138147
ServerCommand::Stop(opts) => stop_server(gsettings, opts).await,
139148
ServerCommand::Info(opts) => command_server_info(gsettings, opts).await,
140149
ServerCommand::GenerateAccess(opts) => command_server_generate_access(gsettings, opts),
150+
ServerCommand::DebugDump(opts) => debug_dump(gsettings, opts).await,
141151
}
142152
}
143153

@@ -199,6 +209,15 @@ async fn start_server(gsettings: &GlobalSettings, opts: ServerStartOpts) -> anyh
199209
init_hq_server(gsettings, server_cfg).await
200210
}
201211

212+
async fn debug_dump(gsettings: &GlobalSettings, opts: DebugDumpOpts) -> anyhow::Result<()> {
213+
let mut session = get_client_session(gsettings.server_directory()).await?;
214+
log::info!("Dumping server state ...");
215+
let message = FromClientMessage::ServerDebugDump(opts.path);
216+
rpc_call!(session.connection(), message, ToClientMessage::Finished => ()).await?;
217+
log::info!("Dump finished");
218+
Ok(())
219+
}
220+
202221
async fn stop_server(gsettings: &GlobalSettings, _opts: ServerStopOpts) -> anyhow::Result<()> {
203222
let mut session = get_client_session(gsettings.server_directory()).await?;
204223
client_stop_server(session.connection()).await?;

crates/hyperqueue/src/server/client/mod.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
use chrono::Utc;
2-
use std::fmt::Debug;
3-
use std::sync::Arc;
4-
52
use futures::{Sink, SinkExt, Stream, StreamExt};
63
use orion::kdf::SecretKey;
4+
use serde_json::json;
5+
use std::fmt::Debug;
6+
use std::path::Path;
7+
use std::sync::Arc;
8+
use std::time::Instant;
79
use tako::{Set, TaskGroup, TaskId};
810
use tokio::net::{TcpListener, TcpStream};
911
use tokio::sync::mpsc::UnboundedSender;
@@ -232,6 +234,7 @@ pub async fn client_rpc_loop<
232234
FromClientMessage::TaskExplain(request) => {
233235
handle_task_explain(&state_ref, senders, request)
234236
}
237+
FromClientMessage::ServerDebugDump(path) => handle_server_dump(senders, &path),
235238
};
236239
if let Err(error) = tx.send(response).await {
237240
log::error!("Cannot reply to client: {error:?}");
@@ -750,3 +753,15 @@ fn handle_worker_info(
750753
})
751754
}))
752755
}
756+
757+
pub(crate) fn handle_server_dump(senders: &Senders, path: &Path) -> ToClientMessage {
758+
let now = Instant::now();
759+
let tako = senders.server_control.debug_dump(now);
760+
let value = json!({
761+
"tako": tako,
762+
});
763+
if let Err(e) = std::fs::write(path, serde_json::to_string(&value).unwrap()) {
764+
return ToClientMessage::Error(format!("Could not write debug dump: {e}"));
765+
}
766+
ToClientMessage::Finished
767+
}

crates/hyperqueue/src/server/client/submit.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1+
use chrono::{DateTime, Utc};
12
use std::borrow::Cow;
23
use std::fmt::{Debug, Formatter};
34
use std::path::PathBuf;
4-
5-
use chrono::{DateTime, Utc};
65
use tako::gateway::{
76
EntryType, SharedTaskConfiguration, TaskConfiguration, TaskDataFlags, TaskSubmit,
87
};

crates/hyperqueue/src/transfer/messages.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub enum FromClientMessage {
4141
OpenJob(JobDescription),
4242
CloseJob(CloseJobRequest),
4343
TaskExplain(TaskExplainRequest),
44+
ServerDebugDump(PathBuf),
4445

4546
// This command switches the connection into streaming connection,
4647
// it will no longer reacts to any other client messages

crates/tako/src/control.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ impl ServerRef {
197197
.broadcast_worker_message(&ToWorkerMessage::SetOverviewIntervalOverride(None));
198198
}
199199
}
200+
201+
pub fn debug_dump(&self, now: Instant) -> serde_json::Value {
202+
let core = self.core_ref.get();
203+
core.dump(now)
204+
}
200205
}
201206

202207
#[allow(clippy::too_many_arguments)]

crates/tako/src/internal/scheduler/multinode.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,13 @@ impl MultiNodeQueue {
8181
pub fn is_sleeping(&self, rq: &ResourceRequest) -> bool {
8282
self.queues.get(rq).unwrap().sleeping
8383
}
84+
85+
pub fn dump(&self) -> serde_json::Value {
86+
serde_json::json!({
87+
"queues": self.queues.iter().map(|(k, v)| (k, v.queue.len() as u32)).collect::<Vec<_>>(),
88+
"requests": self.requests,
89+
})
90+
}
8491
}
8592

8693
enum TaskFindWorkersResult {

crates/tako/src/internal/server/core.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use std::sync::Arc;
2-
use std::time::Duration;
3-
4-
use orion::aead::SecretKey;
2+
use std::time::{Duration, Instant};
53

64
use crate::internal::common::resources::map::{ResourceIdAllocator, ResourceMap};
75
use crate::internal::common::resources::{ResourceId, ResourceRequestVariants};
@@ -17,6 +15,8 @@ use crate::internal::server::workergroup::WorkerGroup;
1715
use crate::internal::server::workerload::WorkerResources;
1816
use crate::internal::server::workermap::WorkerMap;
1917
use crate::{Map, TaskId, WorkerId};
18+
use orion::aead::SecretKey;
19+
use serde_json::json;
2020

2121
pub(crate) type CustomConnectionHandler = Box<dyn Fn(ConnectionDescriptor)>;
2222

@@ -529,6 +529,21 @@ impl Core {
529529
self.sleeping_sn_tasks.shrink_to_fit();
530530
self.multi_node_queue.shrink_to_fit();
531531
}
532+
533+
pub fn dump(&self, now: Instant) -> serde_json::Value {
534+
json!({
535+
"workers": self.workers.values().map(|w| w.dump(now)).collect::<Vec<_>>(),
536+
"worker_groups": self.worker_groups.iter().map(|(k, v)|
537+
json!({"name": k,
538+
"workers": v.worker_ids().collect::<Vec<_>>(),
539+
})).collect::<Vec<_>>(),
540+
"tasks": self.tasks.tasks().map(|t| t.dump()).collect::<Vec<_>>(),
541+
"parked_resources": self.parked_resources.iter().map(|r| r.dump()).collect::<Vec<_>>(),
542+
"sn_ready_to_assign": self.single_node_ready_to_assign,
543+
"mn_queue": self.multi_node_queue.dump(),
544+
"sleeping_sn_tasks": self.sleeping_sn_tasks,
545+
})
546+
}
532547
}
533548

534549
#[cfg(test)]

crates/tako/src/internal/server/task.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use serde_json::json;
12
use std::fmt;
23
use std::rc::Rc;
34
use std::time::Duration;
@@ -45,6 +46,37 @@ impl fmt::Debug for TaskRuntimeState {
4546
}
4647
}
4748

49+
impl TaskRuntimeState {
50+
fn dump(&self) -> serde_json::Value {
51+
match self {
52+
Self::Waiting(info) => json!({
53+
"state": "Waiting",
54+
"unfinished_deps": info.unfinished_deps,
55+
}),
56+
Self::Assigned(w_id) => json!({
57+
"state": "Assigned",
58+
"worker_id": w_id,
59+
}),
60+
Self::Stealing(from_w, to_w) => json!({
61+
"state": "Stealing",
62+
"from_worker": from_w,
63+
"to_worker": to_w,
64+
}),
65+
Self::Running { worker_id, .. } => json!({
66+
"state": "Running",
67+
"worker_id": worker_id,
68+
}),
69+
Self::RunningMultiNode(ws) => json!({
70+
"state": "RunningMultiNode",
71+
"worker_ids": ws,
72+
}),
73+
Self::Finished => json!({
74+
"state": "Finished",
75+
}),
76+
}
77+
}
78+
}
79+
4880
bitflags::bitflags! {
4981
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
5082
pub struct TaskFlags: u32 {
@@ -66,6 +98,18 @@ pub struct TaskConfiguration {
6698
pub body: Box<[u8]>,
6799
}
68100

101+
impl TaskConfiguration {
102+
pub fn dump(&self) -> serde_json::Value {
103+
json!({
104+
"resources": self.resources,
105+
"user_priority": self.user_priority,
106+
"time_limit": self.time_limit,
107+
"crash_limit": self.crash_limit,
108+
"body_len": self.body.len(),
109+
})
110+
}
111+
}
112+
69113
#[cfg_attr(test, derive(Eq, PartialEq))]
70114
pub struct Task {
71115
pub id: TaskId,
@@ -92,6 +136,20 @@ impl fmt::Debug for Task {
92136
}
93137

94138
impl Task {
139+
pub fn dump(&self) -> serde_json::Value {
140+
json!({
141+
"id": self.id.to_string(),
142+
"state": self.state.dump(),
143+
"consumers": self.consumers,
144+
"task_deps": self.task_deps,
145+
"flags": self.flags.bits(),
146+
"scheduler_priority": self.scheduler_priority,
147+
"instance_id": self.instance_id,
148+
"crash_counter": self.crash_counter,
149+
"configuration": self.configuration.dump(),
150+
})
151+
}
152+
95153
pub fn new(
96154
id: TaskId,
97155
task_deps: ThinVec<TaskId>,

crates/tako/src/internal/server/worker.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::internal::server::taskmap::TaskMap;
1212
use crate::internal::server::workerload::{ResourceRequestLowerBound, WorkerLoad, WorkerResources};
1313
use crate::internal::worker::configuration::WorkerConfiguration;
1414
use crate::{TaskId, WorkerId};
15+
use serde_json::json;
1516
use std::time::{Duration, Instant};
1617

1718
bitflags::bitflags! {
@@ -294,9 +295,7 @@ impl Worker {
294295
pub fn has_time_to_run_for_rqv(&self, rqv: &ResourceRequestVariants, now: Instant) -> bool {
295296
self.has_time_to_run(rqv.min_time(), now)
296297
}
297-
}
298298

299-
impl Worker {
300299
pub fn new(
301300
id: WorkerId,
302301
configuration: WorkerConfiguration,
@@ -319,4 +318,23 @@ impl Worker {
319318
idle_timestamp: now,
320319
}
321320
}
321+
322+
pub(crate) fn dump(&self, now: Instant) -> serde_json::Value {
323+
json! ({
324+
"id": self.id,
325+
"sn_tasks": self.sn_tasks,
326+
"sn_load": self.sn_load.dump(),
327+
"flags": self.flags.bits(),
328+
"termination_time": self.termination_time.map(|x| x - now),
329+
"mn_task": self.mn_task.as_ref().map(|t| {
330+
json!({
331+
"task_id": t.task_id,
332+
"reservation_only": t.reservation_only,
333+
})
334+
}),
335+
"idle_timestamp": now - self.idle_timestamp,
336+
"last_heartbeat": now - self.last_heartbeat,
337+
"configuration": self.configuration,
338+
})
339+
}
322340
}

crates/tako/src/internal/server/workerload.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::internal::common::resources::{
77
};
88
use crate::internal::messages::worker::WorkerResourceCounts;
99
use crate::{Map, Set, TaskId};
10+
use serde_json::json;
1011
use std::ops::Deref;
1112

1213
// WorkerResources are transformed information from ResourceDescriptor
@@ -137,6 +138,12 @@ impl WorkerResources {
137138
.min()
138139
.unwrap_or(0)
139140
}
141+
142+
pub fn dump(&self) -> serde_json::Value {
143+
json!({
144+
"n_resources": self.n_resources.iter().map(|x| x.total_fractions()).collect::<Vec<_>>(),
145+
})
146+
}
140147
}
141148

142149
// This represents a current worker load from server perspective
@@ -169,6 +176,14 @@ impl WorkerLoad {
169176
}
170177
}
171178

179+
pub(crate) fn dump(&self) -> serde_json::Value {
180+
json!({
181+
"n_resources": self.n_resources.iter().map(|x| x.total_fractions()).collect::<Vec<_>>(),
182+
"non_first_rq": self.non_first_rq.iter().map(|(k, v)| (k.to_string(), *v)).collect::<Vec<_>>(),
183+
"round_robin_counter": self.round_robin_counter,
184+
})
185+
}
186+
172187
pub(crate) fn add_request(
173188
&mut self,
174189
task_id: TaskId,

0 commit comments

Comments
 (0)