Skip to content

Commit 42540c8

Browse files
spiraliKobzol
authored andcommitted
Enables piped output when "stream" defined job file
1 parent 1085aba commit 42540c8

3 files changed

Lines changed: 34 additions & 12 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
* Fixed the issue of possible ignoring idle timeout when time request is used.
2727
* Worker process terminated because of idle timeout now returns zero exit code.
28+
* Fixes broken streaming when job file is used.
2829

2930
## 0.23.0
3031

crates/hyperqueue/src/client/commands/submit/jobfile.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ pub struct JobSubmitFileOpts {
3131
job: Option<JobId>,
3232
}
3333

34-
fn create_stdio(def: Option<StdioDefInput>, default: &str, is_log: bool) -> StdioDef {
34+
fn create_stdio(def: Option<StdioDefInput>, default: &str, has_streaming: bool) -> StdioDef {
3535
match def {
3636
None => {
37-
if is_log {
37+
if has_streaming {
3838
StdioDef::Pipe
3939
} else {
4040
StdioDef::File {
@@ -54,14 +54,14 @@ fn create_stdio(def: Option<StdioDefInput>, default: &str, is_log: bool) -> Stdi
5454
}
5555
}
5656

57-
fn build_task_description(cfg: TaskConfigDef) -> TaskDescription {
57+
fn build_task_description(cfg: TaskConfigDef, has_streaming: bool) -> TaskDescription {
5858
TaskDescription {
5959
kind: TaskKind::ExternalProgram(TaskKindProgram {
6060
program: ProgramDefinition {
6161
args: cfg.command.into_iter().map(|x| x.into()).collect(),
6262
env: cfg.env,
63-
stdout: create_stdio(cfg.stdout, DEFAULT_STDOUT_PATH, false),
64-
stderr: create_stdio(cfg.stderr, DEFAULT_STDERR_PATH, false),
63+
stdout: create_stdio(cfg.stdout, DEFAULT_STDOUT_PATH, has_streaming),
64+
stderr: create_stdio(cfg.stderr, DEFAULT_STDERR_PATH, has_streaming),
6565
stdin: cfg.stdin.map(|s| s.as_bytes().into()).unwrap_or_default(),
6666
cwd: cfg.cwd.map(|x| x.into()).unwrap_or_else(get_current_dir),
6767
},
@@ -89,6 +89,7 @@ fn build_task(
8989
tdef: TaskDef,
9090
max_id: &mut JobTaskId,
9191
data_flags: TaskDataFlags,
92+
has_streaming: bool,
9293
) -> TaskWithDependencies {
9394
let id = tdef.id.unwrap_or_else(|| {
9495
*max_id = JobTaskId::new(max_id.as_num() + 1);
@@ -97,13 +98,13 @@ fn build_task(
9798
TaskWithDependencies {
9899
id,
99100
data_flags,
100-
task_desc: build_task_description(tdef.config),
101+
task_desc: build_task_description(tdef.config, has_streaming),
101102
task_deps: tdef.deps,
102103
data_deps: tdef.data_deps,
103104
}
104105
}
105106

106-
fn build_job_desc_array(array: ArrayDef) -> JobTaskDescription {
107+
fn build_job_desc_array(array: ArrayDef, has_streaming: bool) -> JobTaskDescription {
107108
let ids = array
108109
.ids
109110
.unwrap_or_else(|| IntArray::from_range(0, array.entries.len() as JobTaskCount));
@@ -121,13 +122,14 @@ fn build_job_desc_array(array: ArrayDef) -> JobTaskDescription {
121122
JobTaskDescription::Array {
122123
ids,
123124
entries,
124-
task_desc: build_task_description(array.config),
125+
task_desc: build_task_description(array.config, has_streaming),
125126
}
126127
}
127128

128129
fn build_job_desc_individual_tasks(
129130
tasks: Vec<TaskDef>,
130131
data_flags: TaskDataFlags,
132+
has_streaming: bool,
131133
) -> crate::Result<JobTaskDescription> {
132134
let mut max_id: JobTaskId = tasks
133135
.iter()
@@ -143,7 +145,7 @@ fn build_job_desc_individual_tasks(
143145
let mut in_degrees = Map::new();
144146
let mut consumers: Map<JobTaskId, Vec<_>> = Map::new();
145147
for task in tasks {
146-
let t = build_task(task, &mut max_id, data_flags);
148+
let t = build_task(task, &mut max_id, data_flags, has_streaming);
147149
if in_degrees.insert(t.id, t.task_deps.len()).is_some() {
148150
return Err(crate::Error::GenericError(format!(
149151
"Task {} is defined multiple times",
@@ -190,13 +192,13 @@ fn build_job_desc_individual_tasks(
190192

191193
fn build_job_submit(jdef: JobDef, job_id: Option<JobId>) -> crate::Result<SubmitRequest> {
192194
let task_desc = if let Some(array) = jdef.array {
193-
build_job_desc_array(array)
195+
build_job_desc_array(array, jdef.stream.is_some())
194196
} else {
195197
let mut data_flags = TaskDataFlags::empty();
196198
if jdef.data_layer {
197199
data_flags.insert(TaskDataFlags::ENABLE_DATA_LAYER);
198200
}
199-
build_job_desc_individual_tasks(jdef.tasks, data_flags)?
201+
build_job_desc_individual_tasks(jdef.tasks, data_flags, jdef.stream.is_some())?
200202
};
201203
Ok(SubmitRequest {
202204
job_desc: JobDescription {

tests/test_jobfile.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ def test_job_file_submit_minimal(hq_env: HqEnv, tmp_path):
2121

2222

2323
def test_job_file_submit_maximal(hq_env: HqEnv, tmp_path):
24+
# This test does not set "stream", to avoid redirecting outputs.
25+
# This is tested separately in test_job_file_stream
2426
hq_env.start_server()
2527
hq_env.start_workers(3, cpus=4, args=["--resource", "gpus=[0,1]"])
2628
os.mkdir("output")
2729
tmp_path.joinpath("job.toml").write_text(
2830
"""
2931
name = "test-job"
30-
stream = "output"
3132
max_fails = 11
3233
3334
[[task]]
@@ -421,3 +422,21 @@ def test_job_file_attach(hq_env: HqEnv, tmp_path):
421422

422423
table = hq_env.command(["job", "info", "1"], as_table=True)
423424
table.check_row_value("Tasks", "4; Ids: 1,3-5")
425+
426+
427+
def test_job_file_stream(hq_env: HqEnv, tmp_path):
428+
hq_env.start_server()
429+
hq_env.start_worker()
430+
tmp_path.joinpath("job.toml").write_text(
431+
"""
432+
stream = "output"
433+
434+
[[task]]
435+
id = 1
436+
command = ["bash", "-c", "echo 'Hello'"]
437+
"""
438+
)
439+
hq_env.command(["job", "submit-file", "job.toml"])
440+
wait_for_job_state(hq_env, 1, "FINISHED")
441+
result = hq_env.command(["output-log", "output", "cat", "1", "stdout"])
442+
assert result == "Hello\n"

0 commit comments

Comments
 (0)