Skip to content

Commit 6ca04aa

Browse files
committed
New scheduler priority assignment
1 parent 2f19b7e commit 6ca04aa

9 files changed

Lines changed: 59 additions & 117 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
### New features
44

5+
* The server scheduler now slightly prioritizes tasks from older jobs and finishing partially-computed task graphs
56
* `hq worker info` contains more information
67
* `hq job forget` tries to free more memory
78
* You can now configure Job name in the Python API.

crates/tako/benches/benchmarks/scheduler.rs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,36 +6,13 @@ use crate::{add_tasks, create_worker};
66
use tako::gateway::LostWorkerReason;
77
use tako::internal::messages::common::TaskFailInfo;
88
use tako::internal::messages::worker::ToWorkerMessage;
9-
use tako::internal::scheduler::metrics::compute_b_level_metric;
109
use tako::internal::scheduler::state::SchedulerState;
1110
use tako::internal::server::comm::Comm;
1211
use tako::internal::server::core::Core;
1312
use tako::task::SerializedTaskContext;
1413
use tako::worker::{WorkerConfiguration, WorkerOverview};
1514
use tako::{InstanceId, TaskId, WorkerId};
1615

17-
fn bench_b_level(c: &mut BenchmarkGroup<WallTime>) {
18-
for task_count in [10, 1_000, 100_000] {
19-
c.bench_with_input(
20-
BenchmarkId::new("compute b-level", task_count),
21-
&task_count,
22-
|b, &task_count| {
23-
b.iter_batched_ref(
24-
|| {
25-
let mut core = Core::default();
26-
add_tasks(&mut core, task_count);
27-
core
28-
},
29-
|core| {
30-
compute_b_level_metric(core.task_map_mut());
31-
},
32-
BatchSize::SmallInput,
33-
);
34-
},
35-
);
36-
}
37-
}
38-
3916
fn bench_schedule(c: &mut BenchmarkGroup<WallTime>) {
4017
for task_count in [10, 1_000, 100_000] {
4118
for worker_count in [1, 8, 16, 32] {
@@ -72,7 +49,6 @@ fn bench_schedule(c: &mut BenchmarkGroup<WallTime>) {
7249
pub fn benchmark(c: &mut Criterion) {
7350
let mut group = c.benchmark_group("scheduler");
7451

75-
bench_b_level(&mut group);
7652
bench_schedule(&mut group);
7753
}
7854

crates/tako/src/internal/scheduler/metrics.rs

Lines changed: 0 additions & 71 deletions
This file was deleted.
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
pub mod metrics;
21
pub mod multinode;
32
pub(crate) mod query;
43
pub mod state;

crates/tako/src/internal/scheduler/multinode.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,6 @@ fn task_priority_tuple(task: &Task) -> PriorityTuple {
3737
}
3838

3939
impl MultiNodeQueue {
40-
pub fn recompute_priorities(&mut self, _task_map: &TaskMap) {
41-
/*if self.queues.is_empty() {
42-
return;
43-
}*/
44-
// TODO: Not priority now, but it should be implemented also for multi node tasks
45-
}
46-
4740
pub fn shrink_to_fit(&mut self) {
4841
self.queues.shrink_to_fit();
4942
self.requests.shrink_to_fit();

crates/tako/src/internal/scheduler/state.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use std::time::{Duration, Instant};
55
use tokio::sync::Notify;
66
use tokio::time::sleep;
77

8-
use super::metrics::compute_b_level_metric;
98
use crate::internal::common::Map;
109
use crate::internal::messages::worker::{TaskIdsMsg, ToWorkerMessage};
1110
use crate::internal::scheduler::multinode::MultiNodeAllocator;
@@ -346,16 +345,6 @@ impl SchedulerState {
346345
}
347346
log::debug!("Scheduling started");
348347

349-
if core.check_has_new_tasks_and_reset() {
350-
// TODO: utilize information and do not recompute all b-levels
351-
trace_time!("scheduler", "blevel", {
352-
compute_b_level_metric(core.task_map_mut())
353-
});
354-
355-
let (multi_node_queue, task_map, _, _) = core.multi_node_queue_split_mut();
356-
multi_node_queue.recompute_priorities(task_map);
357-
}
358-
359348
let multi_node_ready_tasks = core.take_multi_node_ready_to_assign();
360349
if !multi_node_ready_tasks.is_empty() {
361350
let (multi_node_queue, task_map, _, _) = core.multi_node_queue_split_mut();

crates/tako/src/internal/server/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,13 +251,14 @@ fn handle_new_tasks(
251251
return Some(format!("Invalid configuration index {idx}"));
252252
}
253253
let conf = &configurations[idx];
254-
let task = Task::new(
254+
let mut task = Task::new(
255255
task.id,
256256
task.task_deps,
257257
task.dataobj_deps,
258258
conf.clone(),
259259
task.body,
260260
);
261+
task.scheduler_priority = -(task.id.job_id().as_num() as i32);
261262
tasks.push(task);
262263
}
263264
if !msg.adjust_instance_id_and_crash_counters.is_empty() {

crates/tako/src/internal/server/reactor.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ use crate::internal::server::workermap::WorkerMap;
1616
use crate::{TaskId, WorkerId};
1717
use std::fmt::Write;
1818

19+
// Scheduler priority increase for each t-level
20+
pub(crate) const T_LEVEL_WEIGHT: i32 = 256;
21+
1922
pub(crate) fn on_new_worker(core: &mut Core, comm: &mut impl Comm, worker: Worker) {
2023
comm.broadcast_worker_message(&ToWorkerMessage::NewWorker(NewWorkerMsg {
2124
worker_id: worker.id,
@@ -169,17 +172,26 @@ pub(crate) fn on_new_tasks(core: &mut Core, comm: &mut impl Comm, new_tasks: Vec
169172
assert!(!new_tasks.is_empty());
170173
for mut task in new_tasks.into_iter() {
171174
let mut count = 0;
175+
// We assign scheduler priority here, the goal is to set scheduler_priority as follows = t-level * T_LEVEL_WEIGHT - job_id
176+
// where t-level is the length of the maximal path from root tasks
177+
// Goal is to prioritize task graph components that were partially computed + prioritize older tasks (according job_id)
178+
// T-level is T_LEVEL_WEIGHT-times more important than job_id difference,
179+
// but large job_id difference will overweight t-level which is usually bounded, that is done by design.
180+
let mut priority = -(task.id.job_id().as_num() as i32);
172181
task.task_deps.retain(|t| {
173182
if let Some(task_dep) = core.find_task_mut(*t) {
174183
task_dep.add_consumer(task.id);
175184
if !task_dep.is_finished() {
185+
priority =
186+
std::cmp::max(priority, task_dep.scheduler_priority + T_LEVEL_WEIGHT);
176187
count += 1
177188
}
178189
true
179190
} else {
180191
false
181192
}
182193
});
194+
task.set_scheduler_priority(priority);
183195
assert!(matches!(
184196
task.state,
185197
TaskRuntimeState::Waiting(WaitingInfo { unfinished_deps: 0 })

crates/tako/src/internal/tests/test_reactor.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use crate::internal::messages::worker::{StealResponse, StealResponseMsg};
1111
use crate::internal::scheduler::state::SchedulerState;
1212
use crate::internal::server::core::Core;
1313
use crate::internal::server::reactor::{
14-
on_cancel_tasks, on_new_tasks, on_new_worker, on_remove_worker, on_steal_response,
15-
on_task_error, on_task_finished, on_task_running,
14+
T_LEVEL_WEIGHT, on_cancel_tasks, on_new_tasks, on_new_worker, on_remove_worker,
15+
on_steal_response, on_task_error, on_task_finished, on_task_running,
1616
};
1717
use crate::internal::server::task::{Task, TaskRuntimeState};
1818
use crate::internal::server::worker::Worker;
@@ -144,6 +144,48 @@ fn test_worker_add() {
144144
assert_eq!(core.get_workers().count(), 2);
145145
}
146146

147+
#[test]
148+
fn test_scheduler_priority() {
149+
let mut core = Core::default();
150+
let mut comm = create_test_comm();
151+
//new_workers(&mut core, &mut comm, vec![1]);
152+
153+
let t1 = task(501);
154+
let t2 = task_with_deps(502, &[&t1]);
155+
let t3 = task(503);
156+
let t4 = task_with_deps(504, &[&t2]);
157+
158+
let task_id5 = TaskId::new(123.into(), 1.into());
159+
let t5 = TaskBuilder::new(task_id5).build();
160+
let task_id6 = TaskId::new(122.into(), 0.into());
161+
let t6 = TaskBuilder::new(task_id6).build();
162+
let task_id7 = TaskId::new(123.into(), 2.into());
163+
let t7 = TaskBuilder::new(task_id7).task_deps(&[&t5]).build();
164+
let task_id8 = TaskId::new(123.into(), 4.into());
165+
let t8 = TaskBuilder::new(task_id8).build();
166+
167+
on_new_tasks(&mut core, &mut comm, vec![t1, t2, t3, t4, t5, t6, t7, t8]);
168+
169+
assert_eq!(core.get_task(TaskId::new_test(501)).scheduler_priority, 0);
170+
assert_eq!(
171+
core.get_task(TaskId::new_test(502)).scheduler_priority,
172+
T_LEVEL_WEIGHT
173+
);
174+
assert_eq!(core.get_task(TaskId::new_test(503)).scheduler_priority, 0);
175+
assert_eq!(
176+
core.get_task(TaskId::new_test(504)).scheduler_priority,
177+
2 * T_LEVEL_WEIGHT
178+
);
179+
180+
assert_eq!(core.get_task(task_id5).scheduler_priority, -123);
181+
assert_eq!(core.get_task(task_id6).scheduler_priority, -122);
182+
assert_eq!(
183+
core.get_task(task_id7).scheduler_priority,
184+
-123 + T_LEVEL_WEIGHT
185+
);
186+
assert_eq!(core.get_task(task_id8).scheduler_priority, -123);
187+
}
188+
147189
#[test]
148190
fn test_submit_jobs() {
149191
let mut core = Core::default();

0 commit comments

Comments
 (0)