Skip to content

Commit ba65b5b

Browse files
committed
Idle timeout checked in server
1 parent 3b05337 commit ba65b5b

9 files changed

Lines changed: 40 additions & 112 deletions

File tree

crates/tako/src/internal/messages/worker.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ pub enum ToWorkerMessage {
7171
CancelTasks(TaskIdsMsg),
7272
NewWorker(NewWorkerMsg),
7373
LostWorker(WorkerId),
74-
SetReservation(bool),
7574
/// Override the internally set overview interval with a new duration
7675
/// if it is **disabled** on the worker.
7776
/// If the worker has already enabled overview interval, then this does nothing.

crates/tako/src/internal/scheduler/state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ impl SchedulerState {
189189
);
190190
for worker_id in &worker_ids[1..] {
191191
let worker = worker_map.get_worker_mut(*worker_id);
192-
worker.set_reservation(true, comm);
192+
worker.set_reservation(true);
193193
}
194194
}
195195
(
@@ -236,7 +236,7 @@ impl SchedulerState {
236236
})
237237
.unwrap_or(true);
238238
if unreserve {
239-
worker.set_reservation(false, comm);
239+
worker.set_reservation(false);
240240
}
241241
}
242242
}

crates/tako/src/internal/server/rpc.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ async fn worker_rpc_loop(
188188
// Idle timeout might be smaller than 500ms in tests.
189189
#[allow(clippy::manual_clamp)]
190190
heartbeat_interval
191-
.min(idle_timeout.unwrap_or(heartbeat_interval))
191+
.min(idle_timeout.map(|t| t / 16).unwrap_or(heartbeat_interval))
192192
// Sanity check that interval is not too short
193193
.max(Duration::from_millis(500)),
194194
)
@@ -204,6 +204,15 @@ async fn worker_rpc_loop(
204204
log::debug!("Heartbeat not arrived, worker={}", worker.id);
205205
break LostWorkerReason::HeartbeatLost;
206206
}
207+
208+
if let Some(timeout) = idle_timeout {
209+
if worker.is_free()
210+
&& !worker.is_reserved()
211+
&& worker.idle_timestamp + timeout < now
212+
{
213+
break LostWorkerReason::IdleTimeout;
214+
}
215+
}
207216
}
208217
};
209218

crates/tako/src/internal/server/worker.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,12 @@ use crate::internal::common::Set;
55
use crate::internal::common::resources::TimeRequest;
66
use crate::internal::common::resources::map::ResourceMap;
77
use crate::internal::common::resources::{ResourceRequest, ResourceRequestVariants};
8-
use crate::internal::messages::worker::ToWorkerMessage;
9-
use crate::internal::server::comm::Comm;
108
use crate::internal::server::task::Task;
119
use crate::internal::server::taskmap::TaskMap;
1210
use crate::internal::server::workerload::{ResourceRequestLowerBound, WorkerLoad, WorkerResources};
1311
use crate::internal::worker::configuration::WorkerConfiguration;
1412
use crate::{TaskId, WorkerId};
15-
use std::time::Duration;
13+
use std::time::{Duration, Instant};
1614

1715
bitflags::bitflags! {
1816
pub(crate) struct WorkerFlags: u32 {
@@ -55,12 +53,15 @@ pub struct Worker {
5553
pub(crate) resources: WorkerResources,
5654
pub(crate) flags: WorkerFlags,
5755
// When the worker will be terminated
58-
pub(crate) termination_time: Option<std::time::Instant>,
56+
pub(crate) termination_time: Option<Instant>,
5957

6058
pub(crate) mn_task: Option<MultiNodeTaskAssignment>,
6159

60+
// Saved timestamp when a worker is put into an idle state
61+
pub(crate) idle_timestamp: Instant,
62+
6263
// COLD DATA move it into a box (?)
63-
pub(crate) last_heartbeat: std::time::Instant,
64+
pub(crate) last_heartbeat: Instant,
6465
pub(crate) configuration: WorkerConfiguration,
6566
}
6667

@@ -123,12 +124,12 @@ impl Worker {
123124

124125
pub fn reset_mn_task(&mut self) {
125126
self.mn_task = None;
127+
self.idle_timestamp = Instant::now();
126128
}
127129

128-
pub fn set_reservation(&mut self, value: bool, comm: &mut impl Comm) {
130+
pub fn set_reservation(&mut self, value: bool) {
129131
if self.is_reserved() != value {
130132
self.flags.set(WorkerFlags::RESERVED, value);
131-
comm.send_worker_message(self.id, &ToWorkerMessage::SetReservation(value))
132133
}
133134
}
134135

@@ -148,6 +149,9 @@ impl Worker {
148149

149150
pub fn remove_sn_task(&mut self, task: &Task) {
150151
assert!(self.sn_tasks.remove(&task.id));
152+
if self.sn_tasks.is_empty() {
153+
self.idle_timestamp = Instant::now();
154+
}
151155
self.sn_load
152156
.remove_request(task.id, &task.configuration.resources, &self.resources);
153157
}
@@ -205,7 +209,7 @@ impl Worker {
205209
self.flags.contains(WorkerFlags::PARKED)
206210
}
207211

208-
pub fn is_capable_to_run(&self, request: &ResourceRequest, now: std::time::Instant) -> bool {
212+
pub fn is_capable_to_run(&self, request: &ResourceRequest, now: Instant) -> bool {
209213
self.has_time_to_run(request.min_time(), now)
210214
&& self.resources.is_capable_to_run_request(request)
211215
}
@@ -221,7 +225,7 @@ impl Worker {
221225
}
222226

223227
// Returns None if there is no time limit for a worker or time limit was passed
224-
pub fn remaining_time(&self, now: std::time::Instant) -> Option<Duration> {
228+
pub fn remaining_time(&self, now: Instant) -> Option<Duration> {
225229
self.termination_time.map(|time| {
226230
if time < now {
227231
Duration::default()
@@ -231,19 +235,15 @@ impl Worker {
231235
})
232236
}
233237

234-
pub fn has_time_to_run(&self, time_request: TimeRequest, now: std::time::Instant) -> bool {
238+
pub fn has_time_to_run(&self, time_request: TimeRequest, now: Instant) -> bool {
235239
if let Some(time) = self.termination_time {
236240
now + time_request < time
237241
} else {
238242
true
239243
}
240244
}
241245

242-
pub fn has_time_to_run_for_rqv(
243-
&self,
244-
rqv: &ResourceRequestVariants,
245-
now: std::time::Instant,
246-
) -> bool {
246+
pub fn has_time_to_run_for_rqv(&self, rqv: &ResourceRequestVariants, now: Instant) -> bool {
247247
if self.termination_time.is_none() {
248248
return true;
249249
}
@@ -280,6 +280,7 @@ impl Worker {
280280
flags: WorkerFlags::empty(),
281281
last_heartbeat: now,
282282
mn_task: None,
283+
idle_timestamp: now,
283284
}
284285
}
285286
}

crates/tako/src/internal/tests/test_reactor.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -749,14 +749,6 @@ fn test_task_mn_cancel() {
749749
let mut scheduler = create_test_scheduler();
750750
scheduler.run_scheduling(&mut core, &mut comm);
751751
core.sanity_check();
752-
assert!(matches!(
753-
&comm.take_worker_msgs(100, 1)[0],
754-
&ToWorkerMessage::SetReservation(false)
755-
));
756-
assert!(matches!(
757-
&comm.take_worker_msgs(101, 1)[0],
758-
&ToWorkerMessage::SetReservation(false)
759-
));
760752
comm.emptiness_check();
761753

762754
assert!(core.find_task(1.into()).is_none());

crates/tako/src/internal/tests/test_scheduler_mn.rs

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,15 @@ fn check_worker_status_change(s1: WorkerStatus, s2: WorkerStatus, ms: &[ToWorker
4444
(WorkerStatus::Root, WorkerStatus::Root) | (WorkerStatus::None, WorkerStatus::Root) => {
4545
assert!(matches!(ms, &[ToWorkerMessage::ComputeTask(_)]))
4646
}
47-
(WorkerStatus::NonRoot, WorkerStatus::Root) => assert!(matches!(
48-
ms,
49-
&[
50-
ToWorkerMessage::ComputeTask(_),
51-
ToWorkerMessage::SetReservation(false)
52-
]
53-
)),
47+
(WorkerStatus::NonRoot, WorkerStatus::Root) => {
48+
assert!(matches!(ms, &[ToWorkerMessage::ComputeTask(_),]))
49+
}
5450
(WorkerStatus::NonRoot, WorkerStatus::None) => {
55-
assert!(matches!(ms, &[ToWorkerMessage::SetReservation(false)]))
51+
assert!(matches!(ms, &[]))
5652
}
5753
(WorkerStatus::None, WorkerStatus::NonRoot)
5854
| (WorkerStatus::Root, WorkerStatus::NonRoot) => {
59-
assert!(matches!(ms, &[ToWorkerMessage::SetReservation(true)]))
55+
assert!(matches!(ms, &[]))
6056
}
6157
(WorkerStatus::NonRoot, WorkerStatus::NonRoot)
6258
| (WorkerStatus::Root, WorkerStatus::None)
@@ -86,25 +82,21 @@ fn test_schedule_mn_simple() {
8682
scheduler.run_scheduling(&mut core, &mut comm);
8783
core.sanity_check();
8884

89-
let test_mn_task = |task: &Task, comm: &mut TestComm, reservation: bool| -> Vec<WorkerId> {
85+
let test_mn_task = |task: &Task, comm: &mut TestComm| -> Vec<WorkerId> {
9086
let ws = task.mn_placement().unwrap().to_vec();
9187
assert_eq!(ws.len(), 2);
9288
if let ToWorkerMessage::ComputeTask(m) = &comm.take_worker_msgs(ws[0], 1)[0] {
9389
assert_eq!(&m.node_list, &ws);
9490
} else {
9591
unreachable!()
9692
}
97-
if reservation {
98-
let msgs = comm.take_worker_msgs(ws[1], 1);
99-
assert!(matches!(&msgs[0], ToWorkerMessage::SetReservation(true)));
100-
}
10193
ws
10294
};
10395

10496
let task3 = core.get_task(3.into());
105-
let ws3 = test_mn_task(task3, &mut comm, true);
97+
let ws3 = test_mn_task(task3, &mut comm);
10698
let task4 = core.get_task(4.into());
107-
let ws4 = test_mn_task(task4, &mut comm, true);
99+
let ws4 = test_mn_task(task4, &mut comm);
108100
for w in &ws4 {
109101
assert!(!ws3.contains(w));
110102
}
@@ -124,7 +116,7 @@ fn test_schedule_mn_simple() {
124116
core.sanity_check();
125117

126118
let task2 = core.get_task(2.into());
127-
let ws2 = test_mn_task(task2, &mut comm, false);
119+
let ws2 = test_mn_task(task2, &mut comm);
128120
comm.emptiness_check();
129121

130122
finish_on_worker(&mut core, 3, ws2[0]);
@@ -152,14 +144,9 @@ fn test_schedule_mn_reserve() {
152144
comm.take_worker_msgs(ws1[0], 1)[0],
153145
ToWorkerMessage::ComputeTask(_)
154146
));
155-
assert!(matches!(
156-
comm.take_worker_msgs(ws1[1], 1)[0],
157-
ToWorkerMessage::SetReservation(true)
158-
));
159-
assert!(matches!(
160-
comm.take_worker_msgs(ws1[2], 1)[0],
161-
ToWorkerMessage::SetReservation(true)
162-
));
147+
assert!(!core.get_worker_by_id_or_panic(ws1[0]).is_reserved());
148+
assert!(core.get_worker_by_id_or_panic(ws1[1]).is_reserved());
149+
assert!(core.get_worker_by_id_or_panic(ws1[2]).is_reserved());
163150
comm.emptiness_check();
164151
finish_on_worker(&mut core, 1, ws1[0]);
165152
scheduler.run_scheduling(&mut core, &mut comm);

crates/tako/src/internal/tests/test_worker.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -140,24 +140,6 @@ fn test_worker_start_task_resource_variants() {
140140
assert_eq!(requests[0], rq);
141141
}
142142

143-
#[test]
144-
fn test_worker_reservation() {
145-
let state_ref = create_test_worker_state(create_test_worker_config());
146-
let mut state = state_ref.get_mut();
147-
let finish_time = state.last_task_finish_time;
148-
assert!(!state.reservation);
149-
process_worker_message(&mut state, ToWorkerMessage::SetReservation(true));
150-
let comm = state.comm().test();
151-
comm.check_emptiness();
152-
assert!(state.reservation);
153-
assert_eq!(state.last_task_finish_time, finish_time);
154-
process_worker_message(&mut state, ToWorkerMessage::SetReservation(false));
155-
let comm = state.comm().test();
156-
comm.check_emptiness();
157-
assert!(!state.reservation);
158-
assert_ne!(state.last_task_finish_time, finish_time);
159-
}
160-
161143
#[test]
162144
fn test_worker_other_workers() {
163145
let state_ref = create_test_worker_state(create_test_worker_config());

crates/tako/src/internal/worker/rpc.rs

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,6 @@ pub async fn run_worker(
177177
let local_comm_fut = handle_local_comm(local_conn_listener, state_ref.clone());
178178

179179
let heartbeat_fut = heartbeat_process(heartbeat_interval, state_ref.clone());
180-
let idle_timeout_fut = match configuration.idle_timeout {
181-
Some(timeout) => Either::Left(idle_timeout_process(timeout, state_ref.clone())),
182-
None => Either::Right(futures::future::pending()),
183-
};
184180
let overview_fut = send_overview_loop(state_ref.clone());
185181

186182
let time_limit_fut = match time_limit {
@@ -220,10 +216,6 @@ pub async fn run_worker(
220216
log::info!("Time limit reached");
221217
Ok(Some(FromWorkerMessage::Stop(WorkerStopReason::TimeLimitReached)))
222218
}
223-
_ = idle_timeout_fut => {
224-
log::info!("Idle timeout reached");
225-
Ok(Some(FromWorkerMessage::Stop(WorkerStopReason::IdleTimeout)))
226-
}
227219
_ = stop_flag.notified() => {
228220
log::info!("Worker received an external stop notification");
229221
Ok(Some(FromWorkerMessage::Stop(WorkerStopReason::Interrupted)))
@@ -379,24 +371,6 @@ async fn heartbeat_process(heartbeat_interval: Duration, state_ref: WrappedRcRef
379371
}
380372
}
381373

382-
/// Runs until an idle timeout happens.
383-
/// Idle timeout occurs when the worker doesn't have anything to do for the specified duration.
384-
async fn idle_timeout_process(idle_timeout: Duration, state_ref: WrappedRcRefCell<WorkerState>) {
385-
let mut interval = tokio::time::interval(Duration::from_secs(1));
386-
387-
loop {
388-
interval.tick().await;
389-
390-
let state = state_ref.get();
391-
if !state.has_tasks() && !state.reservation {
392-
let elapsed = state.last_task_finish_time.elapsed();
393-
if elapsed > idle_timeout {
394-
break;
395-
}
396-
}
397-
}
398-
}
399-
400374
pub(crate) fn process_worker_message(state: &mut WorkerState, message: ToWorkerMessage) -> bool {
401375
match message {
402376
ToWorkerMessage::ComputeTask(msg) => {
@@ -449,12 +423,6 @@ pub(crate) fn process_worker_message(state: &mut WorkerState, message: ToWorkerM
449423
ToWorkerMessage::LostWorker(worker_id) => {
450424
state.remove_worker(worker_id);
451425
}
452-
ToWorkerMessage::SetReservation(on_off) => {
453-
state.reservation = on_off;
454-
if !on_off {
455-
state.reset_idle_timer();
456-
}
457-
}
458426
ToWorkerMessage::PlacementResponse(data_id, placement) => {
459427
state.process_resolved_placement(data_id, placement);
460428
}

crates/tako/src/internal/worker/state.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,6 @@ pub struct WorkerState {
5555
//pub(crate) secret_key: Option<Arc<SecretKey>>,
5656
pub(crate) start_time: Instant,
5757

58-
pub(crate) reservation: bool, // If true, idle timeout is blocked
59-
pub(crate) last_task_finish_time: Instant,
60-
6158
pub(crate) lc_state: RefCell<LocalCommState>,
6259
pub(crate) data_storage: DataStorage,
6360
download_manager: Option<WorkerDownloadManagerRef>,
@@ -166,10 +163,6 @@ impl WorkerState {
166163
!self.tasks.is_empty()
167164
}
168165

169-
pub fn reset_idle_timer(&mut self) {
170-
self.last_task_finish_time = Instant::now();
171-
}
172-
173166
fn remove_task(
174167
&mut self,
175168
task_id: TaskId,
@@ -214,7 +207,6 @@ impl WorkerState {
214207
if self.tasks.is_empty() {
215208
self.comm.notify_worker_is_empty();
216209
}
217-
self.reset_idle_timer();
218210
outputs
219211
}
220212

@@ -466,8 +458,6 @@ impl WorkerStateRef {
466458
start_time: now,
467459
resource_map,
468460
resource_label_map,
469-
last_task_finish_time: now,
470-
reservation: false,
471461
worker_addresses: Default::default(),
472462
lc_state: RefCell::new(LocalCommState::new()),
473463
data_storage: DataStorage::new(),

0 commit comments

Comments
 (0)