Skip to content

Commit 1434de4

Browse files
committed
Removed old heuristics that does not make sense without data transfers
1 parent ee4c150 commit 1434de4

5 files changed

Lines changed: 61 additions & 45 deletions

File tree

crates/tako/src/internal/scheduler/metrics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
use crate::TaskId;
12
use crate::internal::common::{Map, Set};
23
use crate::internal::server::task::Task;
34
use crate::internal::server::taskmap::TaskMap;
4-
use crate::TaskId;
55

66
pub fn compute_b_level_metric(tasks: &mut TaskMap) {
77
crawl(tasks, |t| t.get_consumers());
@@ -44,11 +44,11 @@ fn crawl<F1: Fn(&Task) -> &Set<TaskId>>(tasks: &mut TaskMap, predecessor_fn: F1)
4444

4545
#[cfg(test)]
4646
mod tests {
47+
use crate::TaskId;
4748
use crate::internal::common::index::ItemId;
4849
use crate::internal::scheduler::metrics::compute_b_level_metric;
4950
use crate::internal::server::core::Core;
5051
use crate::internal::tests::utils::workflows::submit_example_2;
51-
use crate::TaskId;
5252

5353
#[test]
5454
fn b_level_simple_graph() {

crates/tako/src/internal/scheduler/state.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ impl SchedulerState {
9999
try_prev_worker: bool, // Enable heuristics that tries to fit tasks on fewer workers
100100
) -> Option<WorkerId> {
101101
// Fast path
102-
if try_prev_worker && task.task_deps.is_empty() {
102+
if try_prev_worker
103+
/* LATER MIGRATE TO DATA OBJECTS, depending on task deps is wrong: && task.task_deps.is_empty()*/
104+
{
103105
// Note: We are *not* using "is_capable_to_run" but "have_immediate_resources_for_rq",
104106
// because we want to enable fast path only if task can be directly executed
105107
// We want to avoid creation of overloaded
@@ -379,13 +381,14 @@ impl SchedulerState {
379381
//log::debug!("Task {} initially assigned to {}", task.id, worker_id);
380382
};
381383
if let Some(worker_id) = worker_id {
382-
debug_assert!(core
383-
.get_worker_map()
384-
.get_worker(worker_id)
385-
.is_capable_to_run_rqv(
386-
&core.get_task(task_id).configuration.resources,
387-
self.now
388-
));
384+
debug_assert!(
385+
core.get_worker_map()
386+
.get_worker(worker_id)
387+
.is_capable_to_run_rqv(
388+
&core.get_task(task_id).configuration.resources,
389+
self.now
390+
)
391+
);
389392
self.assign(core, task_id, worker_id);
390393
} else {
391394
core.add_sleeping_sn_task(task_id);
@@ -419,7 +422,7 @@ impl SchedulerState {
419422
let task = tasks.get_task_mut(task_id);
420423
if task.is_sn_running()
421424
|| (not_overloaded
422-
&& (task.is_fresh() || !task.task_deps.is_empty())
425+
&& (task.is_fresh()/*|| THIS SHOULD LATER BE MIGRATED TO DATA OBJECTS, not task deps !task.task_deps.is_empty()*/)
423426
&& worker.has_time_to_run_for_rqv(&task.configuration.resources, now))
424427
{
425428
continue;

crates/tako/src/internal/server/client.rs

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,11 @@ pub(crate) async fn process_client_message(
6565
),
6666
FromGatewayMessage::ServerInfo => {
6767
let core = core_ref.get();
68-
assert!(client_sender
69-
.send(ToGatewayMessage::ServerInfo(core.get_server_info()))
70-
.is_ok());
68+
assert!(
69+
client_sender
70+
.send(ToGatewayMessage::ServerInfo(core.get_server_info()))
71+
.is_ok()
72+
);
7173
None
7274
}
7375
FromGatewayMessage::GetTaskInfo(request) => {
@@ -94,11 +96,13 @@ pub(crate) async fn process_client_message(
9496
}
9597
})
9698
.collect();
97-
assert!(client_sender
98-
.send(ToGatewayMessage::TaskInfo(TasksInfoResponse {
99-
tasks: task_infos
100-
}))
101-
.is_ok());
99+
assert!(
100+
client_sender
101+
.send(ToGatewayMessage::TaskInfo(TasksInfoResponse {
102+
tasks: task_infos
103+
}))
104+
.is_ok()
105+
);
102106
None
103107
}
104108
FromGatewayMessage::CancelTasks(msg) => {
@@ -107,12 +111,14 @@ pub(crate) async fn process_client_message(
107111
let mut comm = comm_ref.get_mut();
108112
let (cancelled_tasks, already_finished) =
109113
on_cancel_tasks(&mut core, &mut *comm, &msg.tasks);
110-
assert!(client_sender
111-
.send(ToGatewayMessage::CancelTasksResponse(CancelTasksResponse {
112-
cancelled_tasks,
113-
already_finished
114-
}))
115-
.is_ok());
114+
assert!(
115+
client_sender
116+
.send(ToGatewayMessage::CancelTasksResponse(CancelTasksResponse {
117+
cancelled_tasks,
118+
already_finished
119+
}))
120+
.is_ok()
121+
);
116122
None
117123
}
118124
FromGatewayMessage::StopWorker(msg) => {
@@ -145,9 +151,11 @@ pub(crate) async fn process_client_message(
145151
let core = core_ref.get();
146152
compute_new_worker_query(&core, &msg.worker_queries)
147153
};
148-
assert!(client_sender
149-
.send(ToGatewayMessage::NewWorkerAllocationQueryResponse(response))
150-
.is_ok());
154+
assert!(
155+
client_sender
156+
.send(ToGatewayMessage::NewWorkerAllocationQueryResponse(response))
157+
.is_ok()
158+
);
151159
None
152160
}
153161
FromGatewayMessage::TryReleaseMemory => {
@@ -161,9 +169,11 @@ pub(crate) async fn process_client_message(
161169
.get_worker_map()
162170
.get(&worker_id)
163171
.map(|w| w.worker_info(core.task_map()));
164-
assert!(client_sender
165-
.send(ToGatewayMessage::WorkerInfo(response))
166-
.is_ok());
172+
assert!(
173+
client_sender
174+
.send(ToGatewayMessage::WorkerInfo(response))
175+
.is_ok()
176+
);
167177
None
168178
}
169179
}
@@ -217,10 +227,12 @@ fn handle_new_tasks(
217227
}
218228
on_new_tasks(core, comm, tasks);
219229

220-
assert!(client_sender
221-
.send(ToGatewayMessage::NewTasksResponse(NewTasksResponse {
222-
n_waiting_for_workers: 0 // TODO
223-
}))
224-
.is_ok());
230+
assert!(
231+
client_sender
232+
.send(ToGatewayMessage::NewTasksResponse(NewTasksResponse {
233+
n_waiting_for_workers: 0 // TODO
234+
}))
235+
.is_ok()
236+
);
225237
None
226238
}

crates/tako/src/internal/server/task.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ use std::rc::Rc;
33
use std::time::Duration;
44
use thin_vec::ThinVec;
55

6-
use crate::internal::common::stablemap::ExtractKey;
6+
use crate::WorkerId;
77
use crate::internal::common::Set;
8+
use crate::internal::common::stablemap::ExtractKey;
89
use crate::internal::messages::worker::{ComputeTaskMsg, ToWorkerMessage};
910
use crate::internal::server::taskmap::TaskMap;
10-
use crate::WorkerId;
11-
use crate::{static_assert_size, TaskId};
1211
use crate::{InstanceId, Priority};
12+
use crate::{TaskId, static_assert_size};
1313

1414
#[cfg_attr(test, derive(Eq, PartialEq))]
1515
pub struct WaitingInfo {

crates/tako/src/internal/tests/test_reactor.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::internal::tests::utils::schedule::{
2424
};
2525
use crate::internal::tests::utils::shared::{res_kind_groups, res_kind_sum};
2626
use crate::internal::tests::utils::sorted_vec;
27-
use crate::internal::tests::utils::task::{task, task_running_msg, task_with_deps, TaskBuilder};
27+
use crate::internal::tests::utils::task::{TaskBuilder, task, task_running_msg, task_with_deps};
2828
use crate::internal::tests::utils::workflows::{submit_example_1, submit_example_3};
2929
use crate::internal::tests::utils::{env, schedule};
3030
use crate::internal::worker::configuration::OverviewConfiguration;
@@ -678,11 +678,12 @@ fn test_task_mn_fail() {
678678
comm.emptiness_check();
679679
assert!(core.find_task(1.into()).is_none());
680680
for w in &[100, 101, 102, 103] {
681-
assert!(core
682-
.get_worker_map()
683-
.get_worker((*w).into())
684-
.mn_task()
685-
.is_none());
681+
assert!(
682+
core.get_worker_map()
683+
.get_worker((*w).into())
684+
.mn_task()
685+
.is_none()
686+
);
686687
}
687688
}
688689

0 commit comments

Comments
 (0)