Skip to content

Commit aecde5c

Browse files
committed
Query asks for more workers when resources are predefined
1 parent 2b856e4 commit aecde5c

2 files changed

Lines changed: 136 additions & 30 deletions

File tree

crates/tako/src/internal/scheduler/query.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,34 +26,33 @@ pub(crate) fn compute_new_worker_query(
2626
let add_task = |new_loads: &mut [WorkerTypeState], task: &Task| {
2727
let request = &task.configuration.resources;
2828
for ws in new_loads.iter_mut() {
29-
if !ws.partial {
30-
if !ws.w_resources.is_capable_to_run_with(request, |rq| {
31-
ws.time_limit.is_none_or(|t| rq.min_time() <= t)
32-
}) {
33-
continue;
34-
}
35-
for load in ws.loads.iter_mut() {
36-
if load.have_immediate_resources_for_rqv(request, &ws.w_resources) {
37-
load.add_request(task.id, request, &ws.w_resources);
38-
return;
29+
if !ws.w_resources.is_capable_to_run_with(request, |rq| {
30+
ws.time_limit.is_none_or(|t| rq.min_time() <= t)
31+
}) {
32+
if ws.partial {
33+
if !ws.w_resources.is_lowerbound_for(request, |rq| {
34+
ws.time_limit.is_none_or(|t| rq.min_time() <= t)
35+
}) {
36+
continue;
37+
}
38+
if ws.loads.is_empty() {
39+
let load = WorkerLoad::new(&ws.w_resources);
40+
ws.loads.push(load);
3941
}
4042
}
41-
if ws.loads.len() < ws.max as usize {
42-
let mut load = WorkerLoad::new(&ws.w_resources);
43+
continue;
44+
}
45+
for load in ws.loads.iter_mut() {
46+
if load.have_immediate_resources_for_rqv(request, &ws.w_resources) {
4347
load.add_request(task.id, request, &ws.w_resources);
44-
ws.loads.push(load);
4548
return;
4649
}
47-
} else {
48-
if !ws.w_resources.is_lowerbound_for(request, |rq| {
49-
ws.time_limit.is_none_or(|t| rq.min_time() <= t)
50-
}) {
51-
continue;
52-
}
53-
if ws.loads.is_empty() {
54-
let load = WorkerLoad::new(&ws.w_resources);
55-
ws.loads.push(load);
56-
}
50+
}
51+
if ws.loads.len() < ws.max as usize {
52+
let mut load = WorkerLoad::new(&ws.w_resources);
53+
load.add_request(task.id, request, &ws.w_resources);
54+
ws.loads.push(load);
55+
return;
5756
}
5857
}
5958
};

crates/tako/src/internal/tests/test_query.rs

Lines changed: 114 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -552,14 +552,10 @@ fn test_query_sn_leftovers1() {
552552

553553
#[test]
554554
fn test_query_sn_leftovers2() {
555-
for (cpus, out) in [(1, 0), (2, 1)] {
555+
for (cpus, out) in [(1, 0), (2, 3)] {
556556
let mut rt = TestEnv::new();
557557
for i in 1..=100 {
558-
rt.new_task(
559-
TaskBuilder::new(i)
560-
.cpus_compact(2)
561-
.time_request(((i as u64 % 2) + 1) * 10_000),
562-
);
558+
rt.new_task(TaskBuilder::new(i).cpus_compact(2));
563559
}
564560
rt.schedule();
565561

@@ -569,7 +565,7 @@ fn test_query_sn_leftovers2() {
569565
partial: true,
570566
descriptor: ResourceDescriptor::simple_cpus(cpus),
571567
time_limit: None,
572-
max_sn_workers: 2,
568+
max_sn_workers: 3,
573569
max_workers_per_allocation: 1,
574570
min_utilization: 0.0,
575571
}],
@@ -617,3 +613,114 @@ fn test_query_sn_leftovers() {
617613
);
618614
assert_eq!(r.single_node_workers_per_query, vec![1, 0, 1]);
619615
}
616+
617+
#[test]
618+
fn test_query_partial_query_cpus() {
619+
let mut rt = TestEnv::new();
620+
621+
rt.new_task(TaskBuilder::new(1).cpus_compact(4));
622+
for i in 2..=5 {
623+
rt.new_task(TaskBuilder::new(i).cpus_compact(8));
624+
}
625+
rt.schedule();
626+
627+
let r = compute_new_worker_query(
628+
rt.core(),
629+
&[
630+
WorkerTypeQuery {
631+
partial: true,
632+
descriptor: ResourceDescriptor::simple_cpus(4),
633+
time_limit: None,
634+
max_sn_workers: 2,
635+
max_workers_per_allocation: 3,
636+
min_utilization: 0.0,
637+
},
638+
WorkerTypeQuery {
639+
partial: true,
640+
descriptor: ResourceDescriptor::simple_cpus(16),
641+
time_limit: Some(Duration::from_secs(50)),
642+
max_sn_workers: 5,
643+
max_workers_per_allocation: 3,
644+
min_utilization: 0.0,
645+
},
646+
WorkerTypeQuery {
647+
partial: true,
648+
descriptor: ResourceDescriptor::new(Vec::new()),
649+
time_limit: None,
650+
max_sn_workers: 3,
651+
max_workers_per_allocation: 3,
652+
min_utilization: 0.0,
653+
},
654+
],
655+
);
656+
assert_eq!(r.single_node_workers_per_query, vec![1, 2, 0]);
657+
}
658+
659+
#[test]
660+
fn test_query_partial_query_gpus1() {
661+
for (gpus, has_extra, out) in [
662+
(4, false, 3),
663+
(4, true, 1),
664+
(0, false, 1),
665+
(0, true, 1),
666+
(100, false, 2),
667+
(100, true, 1),
668+
] {
669+
let mut rt = TestEnv::new();
670+
for i in 1..=10 {
671+
let mut builder = TaskBuilder::new(i).cpus_compact(1).add_resource(1, 2);
672+
if has_extra {
673+
builder = builder.add_resource(2, 1);
674+
}
675+
rt.new_task(builder);
676+
}
677+
rt.schedule();
678+
679+
let mut items = vec![ResourceDescriptorItem {
680+
name: "cpus".into(),
681+
kind: ResourceDescriptorKind::simple_indices(8),
682+
}];
683+
if gpus > 0 {
684+
items.push(ResourceDescriptorItem {
685+
name: "gpus".into(),
686+
kind: ResourceDescriptorKind::simple_indices(gpus),
687+
});
688+
}
689+
let descriptor = ResourceDescriptor::new(items);
690+
691+
let r = compute_new_worker_query(
692+
rt.core(),
693+
&[WorkerTypeQuery {
694+
partial: true,
695+
descriptor,
696+
time_limit: None,
697+
max_sn_workers: 3,
698+
max_workers_per_allocation: 3,
699+
min_utilization: 0.0,
700+
}],
701+
);
702+
assert_eq!(r.single_node_workers_per_query, vec![out]);
703+
}
704+
}
705+
706+
#[test]
707+
fn test_query_unknown_do_not_add_extra() {
708+
let mut rt = TestEnv::new();
709+
rt.new_task(TaskBuilder::new(1).cpus_compact(1));
710+
rt.new_task(TaskBuilder::new(2).cpus_compact(1).add_resource(1, 1));
711+
rt.new_task(TaskBuilder::new(3).cpus_compact(1));
712+
rt.new_task(TaskBuilder::new(4).cpus_compact(1).add_resource(1, 1));
713+
714+
let r = compute_new_worker_query(
715+
rt.core(),
716+
&[WorkerTypeQuery {
717+
partial: true,
718+
descriptor: ResourceDescriptor::simple_cpus(1),
719+
time_limit: None,
720+
max_sn_workers: 5,
721+
max_workers_per_allocation: 3,
722+
min_utilization: 0.0,
723+
}],
724+
);
725+
assert_eq!(r.single_node_workers_per_query, vec![2]);
726+
}

0 commit comments

Comments
 (0)