Skip to content

Commit b7a098c

Browse files
spiraliKobzol
authored andcommitted
New query format with partial worker types
1 parent 1e43b52 commit b7a098c

6 files changed

Lines changed: 172 additions & 157 deletions

File tree

crates/hyperqueue/src/server/autoalloc/process.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ async fn perform_submits(
377377
.iter()
378378
.map(|(_, queue)| create_queue_worker_query(queue))
379379
.collect();
380-
let response = senders.server.new_worker_query(queries, false)?;
380+
let response = senders.server.new_worker_query(&queries)?;
381381
log::debug!("Scheduler query response: {response:?}");
382382

383383
// Merge responses back into a single array
@@ -424,6 +424,7 @@ fn create_queue_worker_query(queue: &AllocationQueue) -> WorkerTypeQuery {
424424
WorkerTypeQuery {
425425
// The maximum number of workers that we can provide in this queue
426426
// TODO: estimate the resources of the queue in a better way
427+
partial: false,
427428
descriptor: ResourceDescriptor::new(vec![ResourceDescriptorItem {
428429
name: CPU_RESOURCE_NAME.to_string(),
429430
kind: ResourceDescriptorKind::regular_sockets(1, 1),

crates/tako/src/control.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ use tokio::net::TcpListener;
88
use tokio::sync::Notify;
99

1010
use crate::events::EventProcessor;
11-
use crate::gateway::{
12-
MultiNodeAllocationResponse, ResourceRequestVariants, TaskSubmit, WorkerRuntimeInfo,
13-
};
11+
use crate::gateway::{MultiNodeAllocationResponse, TaskSubmit, WorkerRuntimeInfo};
1412
use crate::internal::common::error::DsError;
1513
use crate::internal::messages::worker::ToWorkerMessage;
1614
use crate::internal::scheduler::query::compute_new_worker_query;
@@ -28,6 +26,9 @@ use crate::{TaskId, WorkerId};
2826

2927
#[derive(Debug)]
3028
pub struct WorkerTypeQuery {
29+
/// If True indicates that we do not have all information about this worker
30+
pub partial: bool,
31+
/// Worker resource descriptor
3132
pub descriptor: ResourceDescriptor,
3233
/// Worker time limit
3334
pub time_limit: Option<Duration>,
@@ -47,11 +48,6 @@ pub struct NewWorkerAllocationResponse {
4748
/// be spawned for the given query
4849
pub single_node_workers_per_query: Vec<usize>,
4950
pub multi_node_allocations: Vec<MultiNodeAllocationResponse>,
50-
/// Resource requests of tasks that left after resolving the query and could not be assigned to
51-
/// any running worker or worker in query response.
52-
/// The second value is number of such tasks
53-
/// Note: It contains both single node and multi node leftovers
54-
pub leftovers: Vec<(ResourceRequestVariants, u32)>,
5551
}
5652

5753
#[derive(Clone)]
@@ -118,22 +114,17 @@ impl ServerRef {
118114
*/
119115
pub fn new_worker_query(
120116
&self,
121-
queries: Vec<WorkerTypeQuery>,
122-
collect_leftovers: bool,
117+
queries: &[WorkerTypeQuery],
123118
) -> crate::Result<NewWorkerAllocationResponse> {
124-
for query in &queries {
119+
for query in queries {
125120
query.descriptor.validate()?;
126121
}
127122
let mut core = self.core_ref.get_mut();
128123
let mut comm = self.comm_ref.get_mut();
129124
if comm.get_scheduling_flag() {
130125
run_scheduling_now(&mut core, &mut comm, Instant::now())
131126
}
132-
Ok(compute_new_worker_query(
133-
&mut core,
134-
&queries,
135-
collect_leftovers,
136-
))
127+
Ok(compute_new_worker_query(&mut core, queries))
137128
}
138129

139130
pub fn try_release_memory(&self) {
Lines changed: 31 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
1-
use crate::Map;
21
use crate::control::{NewWorkerAllocationResponse, WorkerTypeQuery};
32
use crate::gateway::MultiNodeAllocationResponse;
43
use crate::internal::server::core::Core;
54
use crate::internal::server::task::Task;
65
use crate::internal::server::workerload::{WorkerLoad, WorkerResources};
7-
use crate::resources::ResourceRequestVariants;
8-
use smallvec::smallvec;
96
use std::time::Duration;
107

118
struct WorkerTypeState {
9+
partial: bool,
1210
loads: Vec<WorkerLoad>,
1311
w_resources: WorkerResources,
1412
time_limit: Option<Duration>,
@@ -19,46 +17,43 @@ struct WorkerTypeState {
1917
pub(crate) fn compute_new_worker_query(
2018
core: &mut Core,
2119
queries: &[WorkerTypeQuery],
22-
collect_leftovers: bool,
2320
) -> NewWorkerAllocationResponse {
2421
log::debug!("Compute new worker query: query = {queries:?}");
2522

2623
// Scheduler has to be performed before the query, so there should be no ready_to_assign tasks
2724
assert!(core.sn_ready_to_assign().is_empty() || !core.has_workers());
2825

29-
let add_task = |new_loads: &mut [WorkerTypeState],
30-
task: &Task,
31-
leftovers: &mut Map<ResourceRequestVariants, u32>| {
26+
let add_task = |new_loads: &mut [WorkerTypeState], task: &Task| {
3227
let request = &task.configuration.resources;
3328
for ws in new_loads.iter_mut() {
34-
if let Some(time_limit) = ws.time_limit {
35-
if !ws
36-
.w_resources
37-
.is_capable_to_run_with(request, |rq| rq.min_time() <= time_limit)
38-
{
29+
if !ws.partial {
30+
if !ws.w_resources.is_capable_to_run_with(request, |rq| {
31+
ws.time_limit.is_none_or(|t| rq.min_time() <= t)
32+
}) {
3933
continue;
4034
}
41-
} else if !ws.w_resources.is_capable_to_run(request) {
42-
continue;
43-
}
44-
for load in ws.loads.iter_mut() {
45-
if load.have_immediate_resources_for_rqv(request, &ws.w_resources) {
35+
for load in ws.loads.iter_mut() {
36+
if load.have_immediate_resources_for_rqv(request, &ws.w_resources) {
37+
load.add_request(task.id, request, &ws.w_resources);
38+
return;
39+
}
40+
}
41+
if ws.loads.len() < ws.max as usize {
42+
let mut load = WorkerLoad::new(&ws.w_resources);
4643
load.add_request(task.id, request, &ws.w_resources);
44+
ws.loads.push(load);
4745
return;
4846
}
49-
}
50-
if ws.loads.len() < ws.max as usize {
51-
let mut load = WorkerLoad::new(&ws.w_resources);
52-
load.add_request(task.id, request, &ws.w_resources);
53-
ws.loads.push(load);
54-
return;
55-
}
56-
}
57-
if collect_leftovers {
58-
if let Some(count) = leftovers.get_mut(request) {
59-
*count += 1;
6047
} else {
61-
leftovers.insert(request.clone(), 1);
48+
if !ws.w_resources.is_lowerbound_for(request, |rq| {
49+
ws.time_limit.is_none_or(|t| rq.min_time() <= t)
50+
}) {
51+
continue;
52+
}
53+
if ws.loads.is_empty() {
54+
let load = WorkerLoad::new(&ws.w_resources);
55+
ws.loads.push(load);
56+
}
6257
}
6358
}
6459
};
@@ -74,14 +69,14 @@ pub(crate) fn compute_new_worker_query(
7469
let mut new_loads: Vec<_> = queries
7570
.iter()
7671
.map(|q| WorkerTypeState {
72+
partial: q.partial,
7773
loads: Vec::new(),
7874
w_resources: WorkerResources::from_description(&q.descriptor, &resource_map),
7975
time_limit: q.time_limit,
8076
max: q.max_sn_workers,
8177
})
8278
.collect();
8379

84-
let mut leftovers = Map::new();
8580
for worker in core.get_workers() {
8681
let mut load = WorkerLoad::new(&worker.resources);
8782
for task_id in worker.sn_tasks() {
@@ -93,12 +88,12 @@ pub(crate) fn compute_new_worker_query(
9388
load.add_request(task.id, request, &worker.resources);
9489
continue;
9590
}
96-
add_task(&mut new_loads, task, &mut leftovers);
91+
add_task(&mut new_loads, task);
9792
}
9893
}
9994
for task_id in core.sleeping_sn_tasks() {
10095
let task = core.get_task(*task_id);
101-
add_task(&mut new_loads, task, &mut leftovers);
96+
add_task(&mut new_loads, task);
10297
}
10398

10499
// `compute_new_worker_query` should be called immediately after scheduling was performed,
@@ -107,13 +102,16 @@ pub(crate) fn compute_new_worker_query(
107102
// postponing ready_to_assign. So we have to look also into this array
108103
for task_id in core.sn_ready_to_assign() {
109104
let task = core.get_task(*task_id);
110-
add_task(&mut new_loads, task, &mut leftovers);
105+
add_task(&mut new_loads, task);
111106
}
112107

113108
let single_node_allocations = new_loads
114109
.iter()
115110
.zip(queries.iter())
116111
.map(|(ws, q)| {
112+
if ws.partial {
113+
return ws.loads.len();
114+
}
117115
ws.loads
118116
.iter()
119117
.map(|load| {
@@ -148,28 +146,13 @@ pub(crate) fn compute_new_worker_query(
148146
None
149147
}
150148
});
151-
if collect_leftovers && result.is_none() {
152-
let request = ResourceRequestVariants::new(smallvec![rq.clone()]);
153-
leftovers.insert(request, count);
154-
}
155149
result
156150
})
157151
.collect();
158152
multi_node_allocations.sort_unstable_by_key(|x| (x.worker_type, x.worker_per_allocation));
159153

160-
let leftovers = if collect_leftovers {
161-
let resource_map = core.create_resource_map();
162-
leftovers
163-
.into_iter()
164-
.map(|(v, s)| (v.to_gateway(&resource_map), s))
165-
.collect()
166-
} else {
167-
Vec::new()
168-
};
169-
170154
NewWorkerAllocationResponse {
171155
single_node_workers_per_query: single_node_allocations,
172-
leftovers,
173156
multi_node_allocations,
174157
}
175158
}

crates/tako/src/internal/server/workerload.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ impl WorkerResources {
3131
.unwrap_or(ResourceAmount::ZERO)
3232
}
3333

34+
pub(crate) fn get_or_none(&self, resource_id: ResourceId) -> Option<ResourceAmount> {
35+
self.n_resources.get(resource_id).copied()
36+
}
37+
3438
pub(crate) fn from_description(
3539
resource_desc: &ResourceDescriptor,
3640
resource_map: &ResourceMap,
@@ -64,12 +68,33 @@ impl WorkerResources {
6468
})
6569
}
6670

71+
pub(crate) fn is_lowerbound_for_request(&self, request: &ResourceRequest) -> bool {
72+
request.entries().iter().all(|r| {
73+
if let Some(has) = self.get_or_none(r.resource_id) {
74+
let ask = r.request.min_amount();
75+
ask <= has
76+
} else {
77+
true
78+
}
79+
})
80+
}
81+
6782
pub(crate) fn is_capable_to_run(&self, rqv: &ResourceRequestVariants) -> bool {
6883
rqv.requests()
6984
.iter()
7085
.any(|rq| self.is_capable_to_run_request(rq))
7186
}
7287

88+
pub(crate) fn is_lowerbound_for(
89+
&self,
90+
rqv: &ResourceRequestVariants,
91+
filter_fn: impl Fn(&ResourceRequest) -> bool,
92+
) -> bool {
93+
rqv.requests()
94+
.iter()
95+
.any(|rq| filter_fn(rq) && self.is_lowerbound_for_request(rq))
96+
}
97+
7398
pub(crate) fn is_capable_to_run_with(
7499
&self,
75100
rqv: &ResourceRequestVariants,

crates/tako/src/internal/tests/integration/test_basic.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,9 @@ async fn test_task_time_limit_pass() {
115115

116116
fn query_helper(
117117
handler: &mut ServerHandle,
118-
worker_queries: Vec<WorkerTypeQuery>,
118+
worker_queries: &[WorkerTypeQuery],
119119
) -> NewWorkerAllocationResponse {
120-
handler
121-
.server_ref
122-
.new_worker_query(worker_queries, true)
123-
.unwrap()
120+
handler.server_ref.new_worker_query(worker_queries).unwrap()
124121
}
125122

126123
#[tokio::test]
@@ -132,7 +129,8 @@ async fn test_query_no_output_immediate_call() {
132129
.await;
133130
let msg = query_helper(
134131
&mut handler,
135-
vec![WorkerTypeQuery {
132+
&[WorkerTypeQuery {
133+
partial: false,
136134
descriptor: ResourceDescriptor::simple(12),
137135
time_limit: None,
138136
max_sn_workers: 2,
@@ -142,7 +140,6 @@ async fn test_query_no_output_immediate_call() {
142140
);
143141
assert_eq!(msg.single_node_workers_per_query, vec![0]);
144142
assert!(msg.multi_node_allocations.is_empty());
145-
assert!(msg.leftovers.is_empty());
146143
handler.wait(&ids).await;
147144
})
148145
.await;
@@ -158,7 +155,8 @@ async fn test_query_no_output_delayed_call() {
158155
sleep(Duration::from_secs(1)).await;
159156
let msg = query_helper(
160157
&mut handler,
161-
vec![WorkerTypeQuery {
158+
&[WorkerTypeQuery {
159+
partial: false,
162160
descriptor: ResourceDescriptor::simple(12),
163161
time_limit: None,
164162
max_sn_workers: 2,
@@ -168,7 +166,6 @@ async fn test_query_no_output_delayed_call() {
168166
);
169167
assert_eq!(msg.single_node_workers_per_query, vec![0]);
170168
assert!(msg.multi_node_allocations.is_empty());
171-
assert!(msg.leftovers.is_empty());
172169
handler.wait(&ids).await;
173170
})
174171
.await;
@@ -187,7 +184,8 @@ async fn test_query_new_workers_delayed_call() {
187184
sleep(Duration::from_secs(1)).await;
188185
let msg = query_helper(
189186
&mut handler,
190-
vec![WorkerTypeQuery {
187+
&[WorkerTypeQuery {
188+
partial: false,
191189
descriptor: ResourceDescriptor::simple(12),
192190
time_limit: None,
193191
max_sn_workers: 2,
@@ -213,7 +211,8 @@ async fn test_query_new_workers_immediate() {
213211
.await;
214212
let msg = query_helper(
215213
&mut handler,
216-
vec![WorkerTypeQuery {
214+
&[WorkerTypeQuery {
215+
partial: false,
217216
descriptor: ResourceDescriptor::simple(12),
218217
time_limit: None,
219218
max_sn_workers: 2,

0 commit comments

Comments
 (0)