Skip to content

Commit 4389b85

Browse files
committed
new_worker_query is no longer async
1 parent 73b56f2 commit 4389b85

5 files changed

Lines changed: 40 additions & 51 deletions

File tree

crates/tako/src/control.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
use std::future::Future;
22
use std::rc::Rc;
33
use std::sync::Arc;
4-
use std::time::Duration;
4+
use std::time::{Duration, Instant};
55

66
use orion::aead::SecretKey;
77
use tokio::net::TcpListener;
8-
use tokio::sync::{Notify, oneshot};
8+
use tokio::sync::Notify;
99

1010
use crate::events::EventProcessor;
1111
use crate::gateway::{MultiNodeAllocationResponse, TaskSubmit, WorkerRuntimeInfo};
1212
use crate::internal::messages::worker::ToWorkerMessage;
1313
use crate::internal::scheduler::query::compute_new_worker_query;
14-
use crate::internal::scheduler::state::scheduler_loop;
14+
use crate::internal::scheduler::state::{run_scheduling_now, scheduler_loop};
1515
use crate::internal::server::client::handle_new_tasks;
1616
use crate::internal::server::comm::{Comm, CommSenderRef};
1717
use crate::internal::server::core::{CoreRef, CustomConnectionHandler};
@@ -99,26 +99,24 @@ impl ServerRef {
9999
we can get in one allocation at most.
100100
This is used for planning multi-node tasks.
101101
102+
Note: This call may immediately call the full scheduler procedure.
103+
This should not bother the user of the call, except it is probably not a good
104+
idea to call this function often (several times per second) as it may bypass
105+
a scheduler time limitations.
102106
*/
103107
pub fn new_worker_query(
104108
&self,
105109
queries: Vec<WorkerTypeQuery>,
106-
) -> crate::Result<oneshot::Receiver<NewWorkerAllocationResponse>> {
110+
) -> crate::Result<NewWorkerAllocationResponse> {
107111
for query in &queries {
108112
query.descriptor.validate()?;
109113
}
110-
let (sx, rx) = tokio::sync::oneshot::channel();
111-
if self.comm_ref.get().get_scheduling_flag() {
112-
self.comm_ref
113-
.get_mut()
114-
.add_after_scheduling_callback(Box::new(move |core| {
115-
let _ = sx.send(compute_new_worker_query(core, &queries));
116-
}));
117-
} else {
118-
let mut core = self.core_ref.get_mut();
119-
let _ = sx.send(compute_new_worker_query(&mut core, &queries));
120-
};
121-
Ok(rx)
114+
let mut core = self.core_ref.get_mut();
115+
let mut comm = self.comm_ref.get_mut();
116+
if comm.get_scheduling_flag() {
117+
run_scheduling_now(&mut core, &mut comm, Instant::now())
118+
}
119+
Ok(compute_new_worker_query(&mut core, &queries))
122120
}
123121

124122
pub fn try_release_memory(&self) {

crates/tako/src/internal/scheduler/query.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@ struct WorkerTypeState {
1212
max: u32,
1313
}
1414

15-
/* Read the documentation of `new_worker_query`` in control.rs */
15+
/// Read the documentation of `new_worker_query`` in control.rs
1616
pub(crate) fn compute_new_worker_query(
1717
core: &mut Core,
1818
queries: &[WorkerTypeQuery],
1919
) -> NewWorkerAllocationResponse {
2020
log::debug!("Compute new worker query: query = {:?}", queries);
2121

22+
// Scheduler has to be performed before the query, so there should be no ready_to_assign tasks
23+
assert!(core.sn_ready_to_assign().is_empty() || !core.has_workers());
24+
2225
let add_task = |new_loads: &mut [WorkerTypeState], task: &Task| {
2326
let request = &task.configuration.resources;
2427
for ws in new_loads.iter_mut() {

crates/tako/src/internal/scheduler/state.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use tokio::time::sleep;
88
use crate::internal::common::Map;
99
use crate::internal::messages::worker::{TaskIdsMsg, ToWorkerMessage};
1010
use crate::internal::scheduler::multinode::MultiNodeAllocator;
11-
use crate::internal::server::comm::{Comm, CommSenderRef};
11+
use crate::internal::server::comm::{Comm, CommSender, CommSenderRef};
1212
use crate::internal::server::core::{Core, CoreRef};
1313
use crate::internal::server::dataobjmap::DataObjectMap;
1414
use crate::internal::server::task::{Task, TaskRuntimeState};
@@ -43,22 +43,33 @@ pub(crate) async fn scheduler_loop(
4343
let mut last_schedule = Instant::now().checked_sub(minimum_delay * 2).unwrap();
4444
loop {
4545
scheduler_wakeup.notified().await;
46+
if !comm_ref.get().get_scheduling_flag() {
47+
last_schedule = Instant::now();
48+
continue;
49+
}
4650
let mut now = Instant::now();
4751
let since_last_schedule = now - last_schedule;
4852
if minimum_delay > since_last_schedule {
4953
sleep(minimum_delay - since_last_schedule).await;
5054
now = Instant::now();
5155
}
5256
let mut comm = comm_ref.get_mut();
53-
let mut state = SchedulerState::new(now);
57+
if !comm.get_scheduling_flag() {
58+
last_schedule = now;
59+
continue;
60+
}
5461
let mut core = core_ref.get_mut();
55-
state.run_scheduling(&mut core, &mut *comm);
56-
comm.reset_scheduling_flag();
62+
run_scheduling_now(&mut core, &mut comm, now);
5763
last_schedule = Instant::now();
58-
comm.call_after_scheduling_callbacks(&mut core);
5964
}
6065
}
6166

67+
pub fn run_scheduling_now(core: &mut Core, comm: &mut CommSender, now: Instant) {
68+
let mut state = SchedulerState::new(now);
69+
state.run_scheduling(core, &mut *comm);
70+
comm.reset_scheduling_flag();
71+
}
72+
6273
impl SchedulerState {
6374
pub fn new(now: std::time::Instant) -> Self {
6475
SchedulerState {

crates/tako/src/internal/server/comm.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use crate::WorkerId;
88
use crate::events::EventProcessor;
99
use crate::internal::common::{Map, WrappedRcRefCell};
1010
use crate::internal::messages::worker::ToWorkerMessage;
11-
use crate::internal::server::core::Core;
1211
use crate::internal::transfer::auth::serialize;
1312

1413
pub trait Comm {
@@ -19,14 +18,11 @@ pub trait Comm {
1918
fn client(&mut self) -> &mut dyn EventProcessor;
2019
}
2120

22-
type SchedulingCallback = Box<dyn FnOnce(&mut Core)>;
23-
2421
pub struct CommSender {
2522
workers: Map<WorkerId, UnboundedSender<Bytes>>,
2623
need_scheduling: bool,
2724
scheduler_wakeup: Rc<Notify>,
2825
client_events: Option<Box<dyn EventProcessor>>,
29-
after_scheduling_callbacks: Vec<SchedulingCallback>,
3026
panic_on_worker_lost: bool,
3127
}
3228

@@ -38,7 +34,6 @@ impl CommSenderRef {
3834
workers: Default::default(),
3935
scheduler_wakeup,
4036
client_events: None,
41-
after_scheduling_callbacks: Vec::new(),
4237
need_scheduling: false,
4338
panic_on_worker_lost,
4439
})
@@ -70,19 +65,6 @@ impl CommSender {
7065
pub fn get_scheduling_flag(&self) -> bool {
7166
self.need_scheduling
7267
}
73-
74-
pub fn add_after_scheduling_callback(&mut self, callback: SchedulingCallback) {
75-
self.after_scheduling_callbacks.push(callback)
76-
}
77-
78-
pub fn call_after_scheduling_callbacks(&mut self, core: &mut Core) {
79-
if !self.after_scheduling_callbacks.is_empty() {
80-
log::debug!("Running after scheduling callbacks");
81-
self.after_scheduling_callbacks
82-
.drain(..)
83-
.for_each(|x| x(core))
84-
}
85-
}
8668
}
8769

8870
impl Comm for CommSender {

crates/tako/src/internal/tests/integration/test_basic.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,11 @@ async fn test_task_time_limit_pass() {
113113
.await;
114114
}
115115

116-
async fn query_helper(
116+
fn query_helper(
117117
handler: &mut ServerHandle,
118118
worker_queries: Vec<WorkerTypeQuery>,
119119
) -> NewWorkerAllocationResponse {
120-
let result = handler.server_ref.new_worker_query(worker_queries).unwrap();
121-
result.await.unwrap()
120+
handler.server_ref.new_worker_query(worker_queries).unwrap()
122121
}
123122

124123
#[tokio::test]
@@ -137,8 +136,7 @@ async fn test_query_no_output_immediate_call() {
137136
max_worker_per_allocation: 2,
138137
min_utilization: 0.0,
139138
}],
140-
)
141-
.await;
139+
);
142140
assert_eq!(msg.single_node_allocations, vec![0]);
143141
assert!(msg.multi_node_allocations.is_empty());
144142
handler.wait(&ids).await;
@@ -163,8 +161,7 @@ async fn test_query_no_output_delayed_call() {
163161
max_worker_per_allocation: 2,
164162
min_utilization: 0.0,
165163
}],
166-
)
167-
.await;
164+
);
168165
assert_eq!(msg.single_node_allocations, vec![0]);
169166
assert!(msg.multi_node_allocations.is_empty());
170167
handler.wait(&ids).await;
@@ -192,8 +189,7 @@ async fn test_query_new_workers_delayed_call() {
192189
max_worker_per_allocation: 2,
193190
min_utilization: 0.0,
194191
}],
195-
)
196-
.await;
192+
);
197193
assert_eq!(msg.single_node_allocations, vec![1]);
198194
assert!(msg.multi_node_allocations.is_empty());
199195
})
@@ -219,8 +215,7 @@ async fn test_query_new_workers_immediate() {
219215
max_worker_per_allocation: 2,
220216
min_utilization: 0.0,
221217
}],
222-
)
223-
.await;
218+
);
224219
assert_eq!(msg.single_node_allocations, vec![1]);
225220
assert!(msg.multi_node_allocations.is_empty());
226221
})

0 commit comments

Comments
 (0)