Skip to content

Commit 58c1fb9

Browse files
committed
Resource weights
1 parent 2f953e7 commit 58c1fb9

21 files changed

Lines changed: 210 additions & 46 deletions

File tree

crates/hyperqueue/src/client/commands/submit/command.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ use crate::client::commands::duration_doc;
33
use crate::client::commands::submit::directives::parse_hq_directives_from_file;
44
use crate::client::commands::wait::{wait_for_jobs, wait_for_jobs_with_progress};
55
use crate::client::globalsettings::GlobalSettings;
6-
use crate::client::resources::{parse_allocation_request, parse_resource_request};
6+
use crate::client::resources::{
7+
parse_allocation_request, parse_resource_request, parse_resource_weight,
8+
};
79
use crate::common::arraydef::IntArray;
810
use crate::common::cli::OptsWithMatches;
911
use crate::common::parser2::{CharParser, ParseError, all_consuming};
@@ -43,7 +45,9 @@ use tako::gateway::{
4345
ResourceRequestVariants,
4446
};
4547
use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef};
46-
use tako::resources::{AllocationRequest, CPU_RESOURCE_NAME, NumOfNodes, ResourceAmount};
48+
use tako::resources::{
49+
AllocationRequest, CPU_RESOURCE_NAME, NumOfNodes, ResourceAmount, ResourceWeight,
50+
};
4751
use tako::{JobId, JobTaskCount, Map, UserPriority};
4852

4953
const SUBMIT_ARRAY_LIMIT: JobTaskCount = 999;
@@ -238,6 +242,13 @@ pub struct SubmitJobTaskConfOpts {
238242
)]
239243
time_request: Duration,
240244

245+
/// Resource weight
246+
///
247+
/// Weight of resource request within main scheduler.
248+
/// Resource weight has to be a positive number.
249+
#[arg(long, value_parser = parse_resource_weight)]
250+
weight: Option<ResourceWeight>,
251+
241252
/// Pins the job to the cores specified in `--cpus`
242253
#[arg(long, value_enum)]
243254
pin: Option<PinModeArg>,
@@ -360,6 +371,7 @@ impl OptsWithMatches<SubmitJobTaskConfOpts> {
360371
cpus: opts.cpus.or(other_opts.cpus),
361372
resource,
362373
time_request: get_or_default(&self_matches, &other_matches, "time_request"),
374+
weight: opts.weight,
363375
pin: opts.pin.or(other_opts.pin),
364376
task_dir: opts.task_dir || other_opts.task_dir,
365377
cwd: opts.cwd.or(other_opts.cwd),
@@ -533,6 +545,7 @@ impl JobSubmitOpts {
533545
n_nodes: self.conf.nodes,
534546
min_time: self.conf.time_request,
535547
resources,
548+
weight: self.conf.weight.unwrap_or_default(),
536549
};
537550
request.validate()?;
538551
Ok(request)
@@ -668,6 +681,7 @@ pub async fn submit_computation(
668681
nodes: _,
669682
cpus: _,
670683
resource: _,
684+
weight: _,
671685
time_request: _,
672686
pin,
673687
task_dir,

crates/hyperqueue/src/client/commands/submit/defs.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::path::PathBuf;
1212
use std::time::Duration;
1313
use tako::gateway::{CrashLimit, ResourceRequest, ResourceRequestEntries, ResourceRequestEntry};
1414
use tako::program::FileOnCloseBehavior;
15-
use tako::resources::{AllocationRequest, NumOfNodes, ResourceAmount};
15+
use tako::resources::{AllocationRequest, NumOfNodes, ResourceAmount, ResourceWeight};
1616
use tako::{JobTaskCount, JobTaskId, Map, UserPriority};
1717

1818
#[derive(Deserialize)]
@@ -44,6 +44,14 @@ impl PinMode {
4444
}
4545
}
4646

47+
fn deserialize_resource_weight<'de, D>(deserializer: D) -> Result<ResourceWeight, D::Error>
48+
where
49+
D: Deserializer<'de>,
50+
{
51+
let f = f32::deserialize(deserializer)?;
52+
ResourceWeight::try_from(f).map_err(serde::de::Error::custom)
53+
}
54+
4755
fn deserialize_human_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error>
4856
where
4957
D: Deserializer<'de>,
@@ -200,6 +208,10 @@ pub struct ResourceRequestDef {
200208
#[serde(default)]
201209
#[serde(deserialize_with = "deserialize_resource_entries")]
202210
pub resources: ResourceRequestEntries,
211+
212+
#[serde(default)]
213+
#[serde(deserialize_with = "deserialize_resource_weight")]
214+
pub weight: ResourceWeight,
203215
}
204216

205217
impl ResourceRequestDef {
@@ -208,6 +220,7 @@ impl ResourceRequestDef {
208220
n_nodes: self.n_nodes,
209221
resources: self.resources,
210222
min_time: self.time_request,
223+
weight: self.weight,
211224
}
212225
}
213226
}

crates/hyperqueue/src/client/output/cli.rs

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::server::autoalloc::{Allocation, AllocationState};
1313
use crate::server::job::{JobTaskCounters, JobTaskInfo, JobTaskState};
1414
use crate::stream::reader::outputlog::Summary;
1515
use crate::transfer::messages::{
16-
AutoAllocListQueuesResponse, JobDetail, JobInfo, JobTaskDescription, PinMode, QueueData,
16+
AutoAllocListQueuesResponse, JobDetail, JobInfo, JobTaskDescription, QueueData,
1717
QueueState, ServerInfo, TaskDescription, TaskKind, TaskKindProgram, WaitForJobsResponse,
1818
WorkerExitInfo, WorkerInfo,
1919
};
@@ -119,16 +119,8 @@ impl CliOutput {
119119
task_dir: _task_dir,
120120
}) => {
121121
let resources = format_resource_variants(resource_rq);
122-
rows.push(vec![
123-
"Resources".cell().bold(true),
124-
if !matches!(pin_mode, PinMode::None) {
125-
format!("{resources} [pin]")
126-
} else {
127-
resources
128-
}
129-
.cell(),
130-
]);
131-
122+
rows.push(vec!["Resources".cell().bold(true), resources.cell()]);
123+
rows.push(vec!["Pin".cell().bold(true), pin_mode.to_str().cell()]);
132124
rows.push(vec!["Priority".cell().bold(true), priority.cell()]);
133125

134126
rows.push(vec![
@@ -1454,25 +1446,28 @@ fn status_to_cell(status: &Status) -> CellStruct {
14541446
}
14551447

14561448
fn format_resource_request(rq: &ResourceRequest) -> String {
1457-
if rq.n_nodes > 0 {
1458-
return format!("nodes: {}", rq.n_nodes);
1459-
}
14601449
let mut result = String::new();
1461-
let mut first = true;
1462-
1463-
let mut entries: Vec<&ResourceRequestEntry> = rq.resources.iter().collect();
1464-
entries.sort_unstable_by_key(|x| &x.resource);
1465-
1466-
for grq in entries {
1467-
write!(
1468-
result,
1469-
"{}{}: {}",
1470-
if first { "" } else { "\n" },
1471-
grq.resource,
1472-
grq.policy
1473-
)
1474-
.unwrap();
1475-
first = false;
1450+
if rq.n_nodes > 0 {
1451+
write!(result, "nodes: {}", rq.n_nodes).unwrap();
1452+
} else {
1453+
let mut first = true;
1454+
let mut entries: Vec<&ResourceRequestEntry> = rq.resources.iter().collect();
1455+
entries.sort_unstable_by_key(|x| &x.resource);
1456+
1457+
for grq in entries {
1458+
write!(
1459+
result,
1460+
"{}{}: {}",
1461+
if first { "" } else { "\n" },
1462+
grq.resource,
1463+
grq.policy
1464+
)
1465+
.unwrap();
1466+
first = false;
1467+
}
1468+
if !rq.weight.is_default() {
1469+
write!(result, "\n[weight: {}]", rq.weight.as_f32()).unwrap();
1470+
}
14761471
}
14771472
result
14781473
}

crates/hyperqueue/src/client/output/json.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ fn format_task_description(task_desc: &TaskDescription, rqv: &ResourceRequestVar
328328
n_nodes,
329329
resources,
330330
min_time,
331+
weight,
331332
} = v;
332333
json!({
333334
"n_nodes": n_nodes,
@@ -337,7 +338,8 @@ fn format_task_description(task_desc: &TaskDescription, rqv: &ResourceRequestVar
337338
"request": res.policy
338339
})
339340
}).collect::<Vec<_>>(),
340-
"min_time": format_duration(*min_time)
341+
"min_time": format_duration(*min_time),
342+
"weight": weight,
341343
})
342344
})
343345
.collect::<Vec<_>>(),

crates/hyperqueue/src/client/resources.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use chumsky::Parser;
22
use chumsky::primitive::{choice, filter, just};
33
use chumsky::text::TextParser;
44

5-
use tako::resources::AllocationRequest;
5+
use tako::resources::{AllocationRequest, ResourceWeight};
66

77
use crate::common::parser2::{
88
CharParser, ParseError, all_consuming, parse_exact_string, parse_resource_amount,
@@ -60,6 +60,11 @@ pub fn parse_allocation_request(input: &str) -> anyhow::Result<AllocationRequest
6060
all_consuming(p_allocation_request()).parse_text(input)
6161
}
6262

63+
pub fn parse_resource_weight(input: &str) -> anyhow::Result<ResourceWeight> {
64+
let f: f32 = input.parse()?;
65+
Ok(ResourceWeight::try_from(f)?)
66+
}
67+
6368
#[cfg(test)]
6469
mod test {
6570
use super::*;

crates/hyperqueue/src/server/event/journal/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub use write::JournalWriter;
1010

1111
const HQ_JOURNAL_HEADER: &[u8] = b"hqjl0002";
1212

13-
const HQ_JOURNAL_VERSION_MAJOR: u32 = 25;
13+
const HQ_JOURNAL_VERSION_MAJOR: u32 = 26;
1414
const HQ_JOURNAL_VERSION_MINOR: u32 = 0;
1515

1616
#[derive(Serialize, Deserialize)]

crates/pyhq/src/client/job.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ fn build_task_resources(
200200
})
201201
.collect::<anyhow::Result<ResourceRequestEntries>>()?,
202202
min_time: rq.min_time.unwrap_or_default(),
203+
weight: Default::default(),
203204
})
204205
})
205206
.collect::<anyhow::Result<_>>()?,

crates/tako/src/gateway.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::internal::common::error::DsError;
22
use crate::internal::common::resources::ResourceRqId;
3+
use crate::internal::common::resources::request::ResourceWeight;
34
use crate::resources::{AllocationRequest, CPU_RESOURCE_NAME, NumOfNodes, ResourceAmount};
45
use crate::{InstanceId, Map, TaskId, UserPriority};
56
use serde::{Deserialize, Serialize};
@@ -27,6 +28,8 @@ pub struct ResourceRequest {
2728

2829
#[serde(default)]
2930
pub min_time: Duration,
31+
32+
pub weight: ResourceWeight,
3033
}
3134

3235
impl Default for ResourceRequest {
@@ -38,6 +41,7 @@ impl Default for ResourceRequest {
3841
policy: AllocationRequest::Compact(ResourceAmount::new_units(1)),
3942
}],
4043
min_time: Default::default(),
44+
weight: Default::default(),
4145
}
4246
}
4347
}

crates/tako/src/internal/common/resources/map.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ impl GlobalResourceMapping {
5454
}
5555
})
5656
.collect(),
57+
rq.weight,
5758
)
5859
})
5960
.collect(),

crates/tako/src/internal/common/resources/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub use map::{
1919

2020
pub use request::{
2121
AllocationRequest, ResourceAllocRequest, ResourceRequest, ResourceRequestEntries,
22-
ResourceRequestVariants, TimeRequest,
22+
ResourceRequestVariants, ResourceWeight, TimeRequest,
2323
};
2424

2525
pub use amount::{ResourceAmount, ResourceFractions, ResourceUnits};

0 commit comments

Comments
 (0)