Skip to content

Commit f7f3738

Browse files
committed
Retract check has own timed loop and configuration
1 parent 66c833d commit f7f3738

File tree

11 files changed

+38
-2
lines changed

11 files changed

+38
-2
lines changed

crates/hyperqueue/src/client/commands/worker.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,14 @@ pub struct WorkerStartOpts {
289289
/// It should *NOT* be placed on a network filesystem.
290290
#[arg(long)]
291291
pub work_dir: Option<PathBuf>,
292+
293+
#[arg(
294+
long,
295+
default_value = "30s",
296+
value_parser = parse_hms_or_human_time,
297+
help = duration_doc!("Defines the interval at which the worker checks for overdue tasks, based on the min_time setting")
298+
)]
299+
pub retract_check_interval: Duration,
292300
}
293301

294302
pub async fn start_hq_worker(
@@ -335,6 +343,7 @@ fn gather_configuration(opts: WorkerStartOpts) -> anyhow::Result<WorkerConfigura
335343
hostname,
336344
on_server_lost,
337345
work_dir,
346+
retract_check_interval,
338347
} = opts;
339348

340349
let detect_resources = detect_resources_cli
@@ -461,6 +470,7 @@ fn gather_configuration(opts: WorkerStartOpts) -> anyhow::Result<WorkerConfigura
461470
overview_configuration,
462471
extra,
463472
min_utilization,
473+
retract_check_interval,
464474
})
465475
}
466476

crates/hyperqueue/src/client/output/json.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,7 @@ fn format_worker_info(worker_info: WorkerInfo) -> serde_json::Value {
554554
overview_configuration: _,
555555
idle_timeout,
556556
time_limit,
557+
retract_check_interval: _,
557558
on_server_lost,
558559
group,
559560
min_utilization,

crates/pyhq/src/cluster/worker.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ impl RunningWorker {
5151
overview_configuration: OverviewConfiguration::disabled(),
5252
idle_timeout: None,
5353
time_limit: None,
54+
retract_check_interval: Duration::from_secs(30),
5455
on_server_lost: ServerLostPolicy::Stop,
5556
extra: Default::default(),
5657
min_utilization: 0.0,

crates/tako/src/internal/scheduler/query.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::internal::server::worker::Worker;
66
use crate::resources::{ResourceAmount, ResourceDescriptorItem, ResourceDescriptorKind};
77
use crate::worker::{ServerLostPolicy, WorkerConfiguration};
88
use crate::{Set, WorkerId};
9+
use std::time::Duration;
910

1011
/// Read the documentation of `new_worker_query`` in control.rs
1112
pub(crate) fn compute_new_worker_query(
@@ -59,6 +60,7 @@ pub(crate) fn compute_new_worker_query(
5960
on_server_lost: ServerLostPolicy::Stop,
6061
min_utilization: query.min_utilization,
6162
extra: Default::default(),
63+
retract_check_interval: Duration::from_secs(30),
6264
};
6365
let worker = Worker::new(worker_id, configuration, &resource_map, now);
6466
fake_workers.push(worker);

crates/tako/src/internal/server/rpc.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ async fn worker_rpc_loop(
120120
);
121121

122122
let heartbeat_interval = msg.configuration.heartbeat_interval;
123+
123124
log::debug!("Worker heartbeat interval: {heartbeat_interval:?}");
125+
124126
// Sanity that interval is not too small
125127
assert!(heartbeat_interval.as_millis() > 150);
126128

crates/tako/src/internal/tests/integration/utils/worker.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ pub fn create_worker_configuration(
8181
time_limit: None,
8282
extra,
8383
min_utilization: 0.0,
84+
retract_check_interval: Duration::from_secs(30),
8485
},
8586
secret_key,
8687
)

crates/tako/src/internal/tests/test_reactor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ fn test_worker_add() {
4444
},
4545
idle_timeout: None,
4646
time_limit: None,
47+
retract_check_interval: Duration::from_secs(30),
4748
on_server_lost: ServerLostPolicy::Stop,
4849
min_utilization: 0.0,
4950
extra: Default::default(),
@@ -102,6 +103,7 @@ fn test_worker_add() {
102103
},
103104
idle_timeout: None,
104105
time_limit: None,
106+
retract_check_interval: Default::default(),
105107
on_server_lost: ServerLostPolicy::Stop,
106108
min_utilization: 0.0,
107109
extra: Default::default(),

crates/tako/src/internal/tests/utils/worker.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ impl WorkerBuilder {
7878
},
7979
idle_timeout: None,
8080
time_limit: self.time_limit,
81+
retract_check_interval: Duration::from_secs(30),
8182
on_server_lost: ServerLostPolicy::Stop,
8283
min_utilization: self.min_utilization,
8384
extra: Default::default(),

crates/tako/src/internal/worker/configuration.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub struct WorkerConfiguration {
4747
pub overview_configuration: OverviewConfiguration,
4848
pub idle_timeout: Option<Duration>,
4949
pub time_limit: Option<Duration>,
50+
pub retract_check_interval: Duration,
5051
pub on_server_lost: ServerLostPolicy,
5152
pub min_utilization: f32,
5253
pub extra: Map<String, String>,

crates/tako/src/internal/worker/rpc.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ pub async fn run_worker(
123123

124124
let (queue_sender, queue_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
125125
let heartbeat_interval = configuration.heartbeat_interval;
126+
let retract_check_interval = configuration.retract_check_interval;
126127
let time_limit = configuration.time_limit;
127128

128129
let (worker_id, state_ref) = {
@@ -171,6 +172,7 @@ pub async fn run_worker(
171172
let local_comm_fut = handle_local_comm(local_conn_listener, state_ref.clone());
172173

173174
let heartbeat_fut = heartbeat_process(heartbeat_interval, state_ref.clone());
175+
let retract_check_fut = retract_check_process(retract_check_interval, state_ref.clone());
174176
let overview_fut = send_overview_loop(state_ref.clone());
175177

176178
let time_limit_fut = match time_limit {
@@ -201,6 +203,7 @@ pub async fn run_worker(
201203
Ok(Some(FromWorkerMessage::Stop(WorkerStopReason::Interrupted)))
202204
}
203205
_ = heartbeat_fut => { unreachable!() }
206+
_ = retract_check_fut => { unreachable!() }
204207
_ = overview_fut => { unreachable!() }
205208
_ = local_comm_fut => { unreachable!() }
206209
};
@@ -310,9 +313,22 @@ async fn heartbeat_process(heartbeat_interval: Duration, state_ref: WrappedRcRef
310313
state
311314
.comm()
312315
.send_message_to_server(FromWorkerMessage::Heartbeat);
316+
}
317+
log::debug!("Heartbeat sent");
318+
}
319+
}
320+
321+
/// Repeatedly check overdue task
322+
async fn retract_check_process(check_interval: Duration, state_ref: WrappedRcRefCell<WorkerState>) {
323+
let mut interval = tokio::time::interval(check_interval);
324+
loop {
325+
interval.tick().await;
326+
{
327+
let mut state = state_ref.get_mut();
313328
if !state.prefilled_tasks.is_empty()
314329
&& let Some(remaining_time) = state.remaining_time()
315330
{
331+
log::debug!("Checking tasks for retract");
316332
let mut to_remove = Vec::new();
317333
let mut updates = TaskUpdates::new();
318334
for (rq_id, tasks) in &state.prefilled_tasks {
@@ -338,7 +354,6 @@ async fn heartbeat_process(heartbeat_interval: Duration, state_ref: WrappedRcRef
338354
}
339355
}
340356
}
341-
log::debug!("Heartbeat sent");
342357
}
343358
}
344359

0 commit comments

Comments
 (0)