Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
multiple resource request variants within the same task.
* Worker can be started with `--min-utilization` (so not only through autoalloc).
Min-utilization is now respected also in main scheduler (no only autoalloc scheduler)
* New "aborted" task state introduced; it is derived from "cancel" state.
When HQ cancels a task (e.g. because dependency fails), it is now marked as "aborted";
Task state "canceled" is now reserved for tasks that are canceled by the user.


## v0.25.1
Expand Down
4 changes: 3 additions & 1 deletion crates/hyperqueue/src/client/commands/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub struct JobForgetOpts {
long,
value_delimiter(','),
value_enum,
default_value("finished,failed,canceled")
default_value("finished,failed,aborted,canceled")
)]
pub filter: Vec<CompletedJobStatus>,
}
Expand All @@ -104,6 +104,7 @@ pub struct JobTaskIdsOpts {
pub enum CompletedJobStatus {
Finished,
Failed,
Aborted,
Canceled,
}

Expand All @@ -112,6 +113,7 @@ impl CompletedJobStatus {
match self {
CompletedJobStatus::Finished => Status::Finished,
CompletedJobStatus::Failed => Status::Failed,
CompletedJobStatus::Aborted => Status::Aborted,
CompletedJobStatus::Canceled => Status::Canceled,
}
}
Expand Down
14 changes: 14 additions & 0 deletions crates/hyperqueue/src/client/commands/journal/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ fn format_payload(event: EventPayload) -> serde_json::Value {
"job": task_id.job_id(),
"task": task_id.job_task_id(),
}),
EventPayload::TasksAborted { task_ids } => json!({
"type": "task-aborted",
"tasks": task_ids,
}),
EventPayload::TasksCanceled { task_ids } => json!({
"type": "task-canceled",
"tasks": task_ids,
Expand Down Expand Up @@ -156,6 +160,16 @@ fn format_payload(event: EventPayload) -> serde_json::Value {
"job_id": job_id
})
}
EventPayload::JobCancel {
job_id,
cancel_reason,
} => {
json!({
"type": "job-cancel",
"job_id": job_id,
"reason": cancel_reason
})
}
EventPayload::TaskNotify(notify) => {
json!({
"task_id": notify.task_id,
Expand Down
15 changes: 15 additions & 0 deletions crates/hyperqueue/src/client/commands/journal/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ struct JournalStats {
enum TaskStartStop {
Start,
Stop { start_time: TimeDelta, fail: bool },
Abort,
Cancel,
}

Expand Down Expand Up @@ -234,6 +235,7 @@ impl JournalStats {
EventPayload::JobCompleted(_) => {}
EventPayload::JobOpen(_, _) => {}
EventPayload::JobClose(_) => {}
EventPayload::JobCancel { .. } => {}
EventPayload::TaskStarted {
task_id,
worker_ids,
Expand Down Expand Up @@ -277,6 +279,19 @@ impl JournalStats {
);
}
}
EventPayload::TasksAborted { task_ids, .. } => {
for task_id in task_ids {
if let Some((_, workers, rv_id)) = jstats.running_tasks.remove(&task_id) {
jstats.task_start_stop(
time,
task_id,
rv_id,
&workers,
TaskStartStop::Abort,
);
}
}
}
EventPayload::TasksCanceled { task_ids, .. } => {
for task_id in task_ids {
if let Some((_, workers, rv_id)) = jstats.running_tasks.remove(&task_id) {
Expand Down
30 changes: 27 additions & 3 deletions crates/hyperqueue/src/client/commands/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::os::unix::ffi::OsStrExt;
use std::process::Command;

use crate::client::output::cli::{
TASK_COLOR_CANCELED, TASK_COLOR_FAILED, TASK_COLOR_FINISHED, TASK_COLOR_RUNNING,
job_progress_bar,
TASK_COLOR_ABORTED, TASK_COLOR_CANCELED, TASK_COLOR_FAILED, TASK_COLOR_FINISHED,
TASK_COLOR_RUNNING, job_progress_bar,
};
use crate::client::status::is_terminated;
use crate::common::utils::str::pluralize;
Expand Down Expand Up @@ -142,6 +142,7 @@ pub async fn wait_for_jobs_with_progress(
add_count(counters.n_finished_tasks, "FINISHED", TASK_COLOR_FINISHED);
add_count(counters.n_failed_tasks, "FAILED", TASK_COLOR_FAILED);
add_count(counters.n_canceled_tasks, "CANCELED", TASK_COLOR_CANCELED);
add_count(counters.n_aborted_tasks, "ABORTED", TASK_COLOR_ABORTED);

// \x1b[2K clears the line
print!(
Expand Down Expand Up @@ -188,7 +189,30 @@ pub async fn wait_for_jobs_with_progress(
counters.n_canceled_tasks += 1;
}
}
_ => {}
EventPayload::TasksAborted { task_ids } => {
for task_id in task_ids {
if running_tasks.remove(task_id) {
counters.n_running_tasks -= 1;
}
counters.n_aborted_tasks += 1;
}
}
EventPayload::WorkerConnected(..)
| EventPayload::WorkerLost(..)
| EventPayload::WorkerOverviewReceived(..)
| EventPayload::Submit { .. }
| EventPayload::JobOpen(..)
| EventPayload::JobClose(..)
| EventPayload::JobIdle(..)
| EventPayload::JobCancel { .. }
| EventPayload::AllocationQueueCreated(..)
| EventPayload::AllocationQueueRemoved(_)
| EventPayload::AllocationQueued { .. }
| EventPayload::AllocationStarted(_, _)
| EventPayload::AllocationFinished(_, _)
| EventPayload::ServerStart { .. }
| EventPayload::ServerStop
| EventPayload::TaskNotify(..) => {}
},
_ => {
log::warn!("Unexpected message from server: {:?}", &msg);
Expand Down
25 changes: 21 additions & 4 deletions crates/hyperqueue/src/client/output/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use tako::{Map, format_comma_delimited};

pub const TASK_COLOR_CANCELED: Colorization = Colorization::Magenta;
pub const TASK_COLOR_FAILED: Colorization = Colorization::Red;
pub const TASK_COLOR_ABORTED: Colorization = Colorization::BrightRed;
pub const TASK_COLOR_FINISHED: Colorization = Colorization::Green;
pub const TASK_COLOR_RUNNING: Colorization = Colorization::Yellow;
pub const TASK_COLOR_INVALID: Colorization = Colorization::BrightRed;
Expand Down Expand Up @@ -1278,32 +1279,38 @@ fn job_status_to_cell(info: &JobInfo) -> String {
&mut result,
"RUNNING",
info.counters.n_running_tasks,
colored::Color::Yellow,
TASK_COLOR_RUNNING,
);
row(
&mut result,
"FAILED",
info.counters.n_failed_tasks,
colored::Color::Red,
TASK_COLOR_FAILED,
);
row(
&mut result,
"FINISHED",
info.counters.n_finished_tasks,
colored::Color::Green,
TASK_COLOR_FINISHED,
);
row(
&mut result,
"CANCELED",
info.counters.n_canceled_tasks,
colored::Color::Magenta,
TASK_COLOR_CANCELED,
);
row(
&mut result,
"WAITING",
info.counters.n_waiting_tasks(info.n_tasks),
colored::Color::Cyan,
);
row(
&mut result,
"ABORTED",
info.counters.n_aborted_tasks,
TASK_COLOR_ABORTED,
);
result
}

Expand Down Expand Up @@ -1343,6 +1350,7 @@ pub fn job_progress_bar(counters: JobTaskCounters, n_tasks: JobTaskCount, width:
(counters.n_failed_tasks, TASK_COLOR_FAILED),
(counters.n_finished_tasks, TASK_COLOR_FINISHED),
(counters.n_running_tasks, TASK_COLOR_RUNNING),
(counters.n_aborted_tasks, TASK_COLOR_ABORTED),
];

let chars = |count: JobTaskCount| {
Expand Down Expand Up @@ -1432,6 +1440,7 @@ fn status_to_cell(status: &Status) -> CellStruct {
Status::Failed => "FAILED".cell().foreground_color(Some(Color::Red)),
Status::Running => "RUNNING".cell().foreground_color(Some(Color::Yellow)),
Status::Canceled => "CANCELED".cell().foreground_color(Some(Color::Magenta)),
Status::Aborted => "ABORTED".cell().foreground_color(Some(Color::Blue)),
}
}

Expand Down Expand Up @@ -1537,6 +1546,10 @@ fn get_task_time(state: &JobTaskState) -> (Option<DateTime<Utc>>, Option<DateTim
JobTaskState::Canceled {
started_data: Some(started_data),
cancelled_date,
}
| JobTaskState::Aborted {
started_data: Some(started_data),
cancelled_date,
} => (Some(started_data.start_date), Some(*cancelled_date)),
JobTaskState::Running { started_data, .. } => (Some(started_data.start_date), None),
JobTaskState::Finished {
Expand All @@ -1553,6 +1566,10 @@ fn get_task_time(state: &JobTaskState) -> (Option<DateTime<Utc>>, Option<DateTim
started_data: None,
cancelled_date: _,
}
| JobTaskState::Aborted {
started_data: None,
cancelled_date: _,
}
| JobTaskState::Failed {
started_data: None, ..
}
Expand Down
3 changes: 2 additions & 1 deletion crates/hyperqueue/src/client/output/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,12 @@ impl From<VerbosityFlag> for Verbosity {
}
}

pub const JOB_SUMMARY_STATUS_ORDER: [Status; 5] = [
pub const JOB_SUMMARY_STATUS_ORDER: [Status; 6] = [
Status::Running,
Status::Waiting,
Status::Finished,
Status::Failed,
Status::Aborted,
Status::Canceled,
];

Expand Down
2 changes: 2 additions & 0 deletions crates/hyperqueue/src/client/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ fn format_job_info(info: &JobInfo) -> Value {
"finished": counters.n_finished_tasks,
"failed": counters.n_failed_tasks,
"canceled": counters.n_canceled_tasks,
"aborted": counters.n_aborted_tasks,
"waiting": counters.n_waiting_tasks(*n_tasks)
}),
"cancel_reason": cancel_reason,
Expand All @@ -424,6 +425,7 @@ fn format_tasks(tasks: &[(JobTaskId, JobTaskInfo)], map: TaskToPathsMap) -> Valu
JobTaskState::Finished { .. } => "finished",
JobTaskState::Failed { .. } => "failed",
JobTaskState::Canceled { .. } => "canceled",
JobTaskState::Aborted { .. } => "aborted",
};
let mut data = json!({
"id": *task_id,
Expand Down
2 changes: 2 additions & 0 deletions crates/hyperqueue/src/client/output/quiet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl Output for Quiet {
Status::Failed => "FAILED",
Status::Canceled => "CANCELED",
Status::Opened => "OPENED",
Status::Aborted => "ABORTED",
};

println!("{status} {count}");
Expand Down Expand Up @@ -174,5 +175,6 @@ fn format_status(status: &Status) -> &str {
Status::Failed => "FAILED",
Status::Canceled => "CANCELED",
Status::Opened => "OPENED",
Status::Aborted => "ABORTED",
}
}
4 changes: 4 additions & 0 deletions crates/hyperqueue/src/client/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub enum Status {
Finished,
Failed,
Canceled,
Aborted,
Opened,
}

Expand All @@ -23,6 +24,8 @@ pub fn job_status(info: &JobInfo) -> Status {
Status::Waiting
} else if info.counters.n_failed_tasks > 0 {
Status::Failed
} else if info.counters.n_aborted_tasks > 0 {
Status::Aborted
} else if info.counters.n_canceled_tasks > 0 {
Status::Canceled
} else {
Expand All @@ -47,5 +50,6 @@ pub fn get_task_status(status: &JobTaskState) -> Status {
JobTaskState::Finished { .. } => Status::Finished,
JobTaskState::Failed { .. } => Status::Failed,
JobTaskState::Canceled { .. } => Status::Canceled,
JobTaskState::Aborted { .. } => Status::Aborted,
}
}
40 changes: 26 additions & 14 deletions crates/hyperqueue/src/dashboard/data/timelines/job_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub enum DashboardTaskState {
Running,
Finished,
Failed,
Aborted,
Canceled,
}

Expand Down Expand Up @@ -115,6 +116,16 @@ impl JobTimeline {
event.time,
);
}
EventPayload::TasksAborted { task_ids } => {
for task_id in task_ids {
update_task_status(
&mut self.job_timeline,
*task_id,
DashboardTaskState::Aborted,
event.time,
);
}
}
EventPayload::TasksCanceled { task_ids } => {
for task_id in task_ids {
update_task_status(
Expand All @@ -125,20 +136,21 @@ impl JobTimeline {
);
}
}
EventPayload::WorkerConnected(_, _) => {}
EventPayload::WorkerLost(_, _) => {}
EventPayload::WorkerOverviewReceived(_) => {}
EventPayload::JobOpen(_, _) => {}
EventPayload::JobClose(_) => {}
EventPayload::JobIdle(_) => {}
EventPayload::AllocationQueueCreated(_, _) => {}
EventPayload::AllocationQueueRemoved(_) => {}
EventPayload::AllocationQueued { .. } => {}
EventPayload::AllocationStarted(_, _) => {}
EventPayload::AllocationFinished(_, _) => {}
EventPayload::ServerStart { .. } => {}
EventPayload::ServerStop => {}
EventPayload::TaskNotify(_) => {}
EventPayload::WorkerConnected(_, _)
| EventPayload::WorkerLost(_, _)
| EventPayload::WorkerOverviewReceived(_)
| EventPayload::JobOpen(_, _)
| EventPayload::JobClose(_)
| EventPayload::JobIdle(_)
| EventPayload::JobCancel { .. }
| EventPayload::AllocationQueueCreated(_, _)
| EventPayload::AllocationQueueRemoved(_)
| EventPayload::AllocationQueued { .. }
| EventPayload::AllocationStarted(_, _)
| EventPayload::AllocationFinished(_, _)
| EventPayload::ServerStart { .. }
| EventPayload::ServerStop
| EventPayload::TaskNotify(_) => {}
}
}
}
Expand Down
Loading
Loading