Skip to content

Commit b90f5df

Browse files
committed
After 1min close connection to stopping worker
1 parent 671fe3a commit b90f5df

2 files changed

Lines changed: 11 additions & 2 deletions

File tree

crates/tako/src/internal/server/rpc.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,14 @@ async fn worker_rpc_loop(
238238
comm.send_worker_message(worker_id, &ToWorkerMessage::Stop);
239239
}
240240
}
241+
242+
if let Some((_, stop)) = worker.stop_reason {
243+
let delay = Duration::from_secs(60);
244+
if now > stop + delay {
245+
log::debug!("Stopping of worker timeout");
246+
break LostWorkerReason::ConnectionLost;
247+
}
248+
}
241249
}
242250
};
243251

@@ -275,6 +283,7 @@ async fn worker_rpc_loop(
275283
let reason = core
276284
.get_worker_by_id_or_panic(worker_id)
277285
.stop_reason
286+
.map(|(r, _)| r)
278287
.unwrap_or(reason);
279288
comm.remove_worker(worker_id);
280289
on_remove_worker(&mut core, &mut *comm, worker_id, reason);

crates/tako/src/internal/server/worker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub struct Worker {
5353
pub(crate) flags: WorkerFlags,
5454
// When the worker will be terminated
5555
pub(crate) termination_time: Option<Instant>,
56-
pub(crate) stop_reason: Option<LostWorkerReason>,
56+
pub(crate) stop_reason: Option<(LostWorkerReason, Instant)>,
5757

5858
pub(crate) mn_task: Option<MultiNodeTaskAssignment>,
5959

@@ -236,7 +236,7 @@ impl Worker {
236236
}
237237

238238
pub fn set_stop(&mut self, reason: LostWorkerReason) {
239-
self.stop_reason = Some(reason);
239+
self.stop_reason = Some((reason, Instant::now()));
240240
}
241241

242242
pub fn is_stopping(&self) -> bool {

0 commit comments

Comments
 (0)