Skip to content

Commit e7a7f4b

Browse files
committed
Implements Query leftovers flag
1 parent 33c56ed commit e7a7f4b

4 files changed

Lines changed: 49 additions & 7 deletions

File tree

crates/tako/src/control.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@ pub struct WorkerTypeQuery {
4141

4242
#[derive(Debug)]
4343
pub struct NewWorkerAllocationResponse {
44-
/// Array of the same size as number of queries, it returns the number of workers that should
44+
/// Array of the same size as the number of queries, it returns the number of workers that should
4545
/// be spawned for the given query
4646
pub single_node_workers_per_query: Vec<usize>,
47+
/// True iff not all tasks could be executed on resources provided in the query
48+
pub single_node_leftovers: bool,
4749
pub multi_node_allocations: Vec<MultiNodeAllocationResponse>,
4850
}
4951

crates/tako/src/internal/scheduler/query.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub(crate) fn compute_new_worker_query(
2222
// Scheduler has to be performed before the query, so there should be no ready_to_assign tasks
2323
assert!(core.sn_ready_to_assign().is_empty() || !core.has_workers());
2424

25-
let add_task = |new_loads: &mut [WorkerTypeState], task: &Task| {
25+
let add_task = |new_loads: &mut [WorkerTypeState], task: &Task| -> bool {
2626
let request = &task.configuration.resources;
2727
for ws in new_loads.iter_mut() {
2828
if let Some(time_limit) = ws.time_limit {
@@ -38,16 +38,17 @@ pub(crate) fn compute_new_worker_query(
3838
for load in ws.loads.iter_mut() {
3939
if load.have_immediate_resources_for_rqv(request, &ws.w_resources) {
4040
load.add_request(task.id, request, &ws.w_resources);
41-
return;
41+
return false;
4242
}
4343
}
4444
if ws.loads.len() < ws.max as usize {
4545
let mut load = WorkerLoad::new(&ws.w_resources);
4646
load.add_request(task.id, request, &ws.w_resources);
4747
ws.loads.push(load);
48-
return;
48+
return false;
4949
}
5050
}
51+
true
5152
};
5253

5354
/* Make sure that all named resources provided has an Id */
@@ -68,6 +69,7 @@ pub(crate) fn compute_new_worker_query(
6869
})
6970
.collect();
7071

72+
let mut leftover = false;
7173
for worker in core.get_workers() {
7274
let mut load = WorkerLoad::new(&worker.resources);
7375
for task_id in worker.sn_tasks() {
@@ -79,12 +81,12 @@ pub(crate) fn compute_new_worker_query(
7981
load.add_request(task.id, request, &worker.resources);
8082
continue;
8183
}
82-
add_task(&mut new_loads, task);
84+
leftover |= add_task(&mut new_loads, task);
8385
}
8486
}
8587
for task_id in core.sleeping_sn_tasks() {
8688
let task = core.get_task(*task_id);
87-
add_task(&mut new_loads, task);
89+
leftover |= add_task(&mut new_loads, task);
8890
}
8991

9092
// `compute_new_worker_query` should be called immediately after scheduling was performed,
@@ -93,7 +95,7 @@ pub(crate) fn compute_new_worker_query(
9395
// postponing ready_to_assign. So we have to look also into this array
9496
for task_id in core.sn_ready_to_assign() {
9597
let task = core.get_task(*task_id);
96-
add_task(&mut new_loads, task);
98+
leftover |= add_task(&mut new_loads, task);
9799
}
98100

99101
let single_node_allocations = new_loads
@@ -140,6 +142,7 @@ pub(crate) fn compute_new_worker_query(
140142

141143
NewWorkerAllocationResponse {
142144
single_node_workers_per_query: single_node_allocations,
145+
single_node_leftovers: leftover,
143146
multi_node_allocations,
144147
}
145148
}

crates/tako/src/internal/tests/integration/test_basic.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ async fn test_query_no_output_immediate_call() {
139139
);
140140
assert_eq!(msg.single_node_workers_per_query, vec![0]);
141141
assert!(msg.multi_node_allocations.is_empty());
142+
assert!(!msg.single_node_leftovers);
142143
handler.wait(&ids).await;
143144
})
144145
.await;
@@ -164,6 +165,7 @@ async fn test_query_no_output_delayed_call() {
164165
);
165166
assert_eq!(msg.single_node_workers_per_query, vec![0]);
166167
assert!(msg.multi_node_allocations.is_empty());
168+
assert!(!msg.single_node_leftovers);
167169
handler.wait(&ids).await;
168170
})
169171
.await;
@@ -192,6 +194,7 @@ async fn test_query_new_workers_delayed_call() {
192194
);
193195
assert_eq!(msg.single_node_workers_per_query, vec![1]);
194196
assert!(msg.multi_node_allocations.is_empty());
197+
assert!(!msg.single_node_leftovers);
195198
})
196199
.await;
197200
}
@@ -218,6 +221,7 @@ async fn test_query_new_workers_immediate() {
218221
);
219222
assert_eq!(msg.single_node_workers_per_query, vec![1]);
220223
assert!(msg.multi_node_allocations.is_empty());
224+
assert!(!msg.single_node_leftovers);
221225
})
222226
.await;
223227
}

crates/tako/src/internal/tests/test_query.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ fn test_query_no_tasks() {
2424
);
2525
assert_eq!(r.single_node_workers_per_query, vec![0]);
2626
assert!(r.multi_node_allocations.is_empty());
27+
assert!(!r.single_node_leftovers);
2728
}
2829

2930
#[test]
@@ -53,6 +54,7 @@ fn test_query_enough_workers() {
5354
);
5455
assert_eq!(r.single_node_workers_per_query, vec![0]);
5556
assert!(r.multi_node_allocations.is_empty());
57+
assert!(!r.single_node_leftovers);
5658
}
5759

5860
#[test]
@@ -91,6 +93,7 @@ fn test_query_no_enough_workers1() {
9193
);
9294
assert_eq!(r.single_node_workers_per_query, vec![0, 1]);
9395
assert!(r.multi_node_allocations.is_empty());
96+
assert!(!r.single_node_leftovers);
9497
}
9598

9699
#[test]
@@ -124,6 +127,7 @@ fn test_query_enough_workers2() {
124127
);
125128
assert_eq!(r.single_node_workers_per_query, vec![0, 0]);
126129
assert!(r.multi_node_allocations.is_empty());
130+
assert!(!r.single_node_leftovers);
127131
}
128132

129133
#[test]
@@ -158,6 +162,7 @@ fn test_query_not_enough_workers3() {
158162
);
159163
assert_eq!(r.single_node_workers_per_query, vec![1, 0]);
160164
assert!(r.multi_node_allocations.is_empty());
165+
assert!(!r.single_node_leftovers);
161166
}
162167

163168
#[test]
@@ -199,6 +204,7 @@ fn test_query_many_workers_needed() {
199204
);
200205
assert_eq!(r.single_node_workers_per_query, vec![5, 1, 26]);
201206
assert!(r.multi_node_allocations.is_empty());
207+
assert!(!r.single_node_leftovers);
202208
}
203209

204210
#[test]
@@ -255,6 +261,7 @@ fn test_query_multi_node_tasks() {
255261
assert_eq!(r.multi_node_allocations[2].worker_type, 1);
256262
assert_eq!(r.multi_node_allocations[2].worker_per_allocation, 6);
257263
assert_eq!(r.multi_node_allocations[2].max_allocations, 10);
264+
assert!(!r.single_node_leftovers);
258265
}
259266

260267
#[test]
@@ -276,6 +283,7 @@ fn test_query_multi_node_time_limit() {
276283
}],
277284
);
278285
assert_eq!(r.multi_node_allocations.len(), allocs);
286+
assert!(!r.single_node_leftovers);
279287
}
280288
}
281289

@@ -466,3 +474,28 @@ fn test_query_min_time1() {
466474
assert_eq!(r.single_node_workers_per_query, vec![1]);
467475
assert!(r.multi_node_allocations.is_empty());
468476
}
477+
478+
#[test]
479+
fn test_query_leftovers() {
480+
for (n, leftovers) in [(1, false), (4, false), (8, false), (9, true), (12, true)] {
481+
let mut rt = TestEnv::new();
482+
483+
rt.new_workers(&[4]);
484+
for i in 1..=n {
485+
rt.new_task(TaskBuilder::new(i).cpus_compact(1));
486+
}
487+
rt.schedule();
488+
489+
let r = compute_new_worker_query(
490+
rt.core(),
491+
&[WorkerTypeQuery {
492+
descriptor: ResourceDescriptor::simple(2),
493+
time_limit: None,
494+
max_sn_workers: 2,
495+
max_workers_per_allocation: 1,
496+
min_utilization: 0.0,
497+
}],
498+
);
499+
assert_eq!(r.single_node_leftovers, leftovers);
500+
}
501+
}

0 commit comments

Comments
 (0)