diff --git a/CHANGELOG.md b/CHANGELOG.md index 0de664b77..cd7045bb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,9 @@ multiple resource request variants within the same task. * Worker can be started with `--min-utilization` (so not only through autoalloc). Min-utilization is now respected also in main scheduler (no only autoalloc scheduler) +* New "aborted" task state introduced; it is derived from "cancel" state. + When HQ cancels a task (e.g. because dependency fails), it is now marked as "aborted"; + Task state "canceled" is now reserved for tasks that are canceled by the user. ## v0.25.1 diff --git a/crates/hyperqueue/src/client/commands/job.rs b/crates/hyperqueue/src/client/commands/job.rs index 091a6fcdb..b9d9d2fb0 100644 --- a/crates/hyperqueue/src/client/commands/job.rs +++ b/crates/hyperqueue/src/client/commands/job.rs @@ -82,7 +82,7 @@ pub struct JobForgetOpts { long, value_delimiter(','), value_enum, - default_value("finished,failed,canceled") + default_value("finished,failed,aborted,canceled") )] pub filter: Vec, } @@ -104,6 +104,7 @@ pub struct JobTaskIdsOpts { pub enum CompletedJobStatus { Finished, Failed, + Aborted, Canceled, } @@ -112,6 +113,7 @@ impl CompletedJobStatus { match self { CompletedJobStatus::Finished => Status::Finished, CompletedJobStatus::Failed => Status::Failed, + CompletedJobStatus::Aborted => Status::Aborted, CompletedJobStatus::Canceled => Status::Canceled, } } diff --git a/crates/hyperqueue/src/client/commands/journal/output.rs b/crates/hyperqueue/src/client/commands/journal/output.rs index b27c6816f..0fbba9734 100644 --- a/crates/hyperqueue/src/client/commands/journal/output.rs +++ b/crates/hyperqueue/src/client/commands/journal/output.rs @@ -103,6 +103,10 @@ fn format_payload(event: EventPayload) -> serde_json::Value { "job": task_id.job_id(), "task": task_id.job_task_id(), }), + EventPayload::TasksAborted { task_ids } => json!({ + "type": "task-aborted", + "tasks": task_ids, + }), EventPayload::TasksCanceled { task_ids } => json!({ "type": "task-canceled", "tasks": task_ids, @@ -156,6 +160,16 @@ fn format_payload(event: EventPayload) -> serde_json::Value { "job_id": job_id }) } + EventPayload::JobCancel { + job_id, + cancel_reason, + } => { + json!({ + "type": "job-cancel", + "job_id": job_id, + "reason": cancel_reason + }) + } EventPayload::TaskNotify(notify) => { json!({ "task_id": notify.task_id, diff --git a/crates/hyperqueue/src/client/commands/journal/report.rs b/crates/hyperqueue/src/client/commands/journal/report.rs index 317658161..2fc8c072c 100644 --- a/crates/hyperqueue/src/client/commands/journal/report.rs +++ b/crates/hyperqueue/src/client/commands/journal/report.rs @@ -150,6 +150,7 @@ struct JournalStats { enum TaskStartStop { Start, Stop { start_time: TimeDelta, fail: bool }, + Abort, Cancel, } @@ -234,6 +235,7 @@ impl JournalStats { EventPayload::JobCompleted(_) => {} EventPayload::JobOpen(_, _) => {} EventPayload::JobClose(_) => {} + EventPayload::JobCancel { .. } => {} EventPayload::TaskStarted { task_id, worker_ids, @@ -277,6 +279,19 @@ impl JournalStats { ); } } + EventPayload::TasksAborted { task_ids, .. } => { + for task_id in task_ids { + if let Some((_, workers, rv_id)) = jstats.running_tasks.remove(&task_id) { + jstats.task_start_stop( + time, + task_id, + rv_id, + &workers, + TaskStartStop::Abort, + ); + } + } + } EventPayload::TasksCanceled { task_ids, .. } => { for task_id in task_ids { if let Some((_, workers, rv_id)) = jstats.running_tasks.remove(&task_id) { diff --git a/crates/hyperqueue/src/client/commands/wait.rs b/crates/hyperqueue/src/client/commands/wait.rs index d75fc26bb..429354a07 100644 --- a/crates/hyperqueue/src/client/commands/wait.rs +++ b/crates/hyperqueue/src/client/commands/wait.rs @@ -4,8 +4,8 @@ use std::os::unix::ffi::OsStrExt; use std::process::Command; use crate::client::output::cli::{ - TASK_COLOR_CANCELED, TASK_COLOR_FAILED, TASK_COLOR_FINISHED, TASK_COLOR_RUNNING, - job_progress_bar, + TASK_COLOR_ABORTED, TASK_COLOR_CANCELED, TASK_COLOR_FAILED, TASK_COLOR_FINISHED, + TASK_COLOR_RUNNING, job_progress_bar, }; use crate::client::status::is_terminated; use crate::common::utils::str::pluralize; @@ -142,6 +142,7 @@ pub async fn wait_for_jobs_with_progress( add_count(counters.n_finished_tasks, "FINISHED", TASK_COLOR_FINISHED); add_count(counters.n_failed_tasks, "FAILED", TASK_COLOR_FAILED); add_count(counters.n_canceled_tasks, "CANCELED", TASK_COLOR_CANCELED); + add_count(counters.n_aborted_tasks, "ABORTED", TASK_COLOR_ABORTED); // \x1b[2K clears the line print!( @@ -188,7 +189,30 @@ pub async fn wait_for_jobs_with_progress( counters.n_canceled_tasks += 1; } } - _ => {} + EventPayload::TasksAborted { task_ids } => { + for task_id in task_ids { + if running_tasks.remove(task_id) { + counters.n_running_tasks -= 1; + } + counters.n_aborted_tasks += 1; + } + } + EventPayload::WorkerConnected(..) + | EventPayload::WorkerLost(..) + | EventPayload::WorkerOverviewReceived(..) + | EventPayload::Submit { .. } + | EventPayload::JobOpen(..) + | EventPayload::JobClose(..) + | EventPayload::JobIdle(..) + | EventPayload::JobCancel { .. } + | EventPayload::AllocationQueueCreated(..) + | EventPayload::AllocationQueueRemoved(_) + | EventPayload::AllocationQueued { .. } + | EventPayload::AllocationStarted(_, _) + | EventPayload::AllocationFinished(_, _) + | EventPayload::ServerStart { .. } + | EventPayload::ServerStop + | EventPayload::TaskNotify(..) => {} }, _ => { log::warn!("Unexpected message from server: {:?}", &msg); diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs index 23ee4edcc..29aaa99fa 100644 --- a/crates/hyperqueue/src/client/output/cli.rs +++ b/crates/hyperqueue/src/client/output/cli.rs @@ -52,6 +52,7 @@ use tako::{Map, format_comma_delimited}; pub const TASK_COLOR_CANCELED: Colorization = Colorization::Magenta; pub const TASK_COLOR_FAILED: Colorization = Colorization::Red; +pub const TASK_COLOR_ABORTED: Colorization = Colorization::BrightRed; pub const TASK_COLOR_FINISHED: Colorization = Colorization::Green; pub const TASK_COLOR_RUNNING: Colorization = Colorization::Yellow; pub const TASK_COLOR_INVALID: Colorization = Colorization::BrightRed; @@ -1278,25 +1279,25 @@ fn job_status_to_cell(info: &JobInfo) -> String { &mut result, "RUNNING", info.counters.n_running_tasks, - colored::Color::Yellow, + TASK_COLOR_RUNNING, ); row( &mut result, "FAILED", info.counters.n_failed_tasks, - colored::Color::Red, + TASK_COLOR_FAILED, ); row( &mut result, "FINISHED", info.counters.n_finished_tasks, - colored::Color::Green, + TASK_COLOR_FINISHED, ); row( &mut result, "CANCELED", info.counters.n_canceled_tasks, - colored::Color::Magenta, + TASK_COLOR_CANCELED, ); row( &mut result, @@ -1304,6 +1305,12 @@ fn job_status_to_cell(info: &JobInfo) -> String { info.counters.n_waiting_tasks(info.n_tasks), colored::Color::Cyan, ); + row( + &mut result, + "ABORTED", + info.counters.n_aborted_tasks, + TASK_COLOR_ABORTED, + ); result } @@ -1343,6 +1350,7 @@ pub fn job_progress_bar(counters: JobTaskCounters, n_tasks: JobTaskCount, width: (counters.n_failed_tasks, TASK_COLOR_FAILED), (counters.n_finished_tasks, TASK_COLOR_FINISHED), (counters.n_running_tasks, TASK_COLOR_RUNNING), + (counters.n_aborted_tasks, TASK_COLOR_ABORTED), ]; let chars = |count: JobTaskCount| { @@ -1432,6 +1440,7 @@ fn status_to_cell(status: &Status) -> CellStruct { Status::Failed => "FAILED".cell().foreground_color(Some(Color::Red)), Status::Running => "RUNNING".cell().foreground_color(Some(Color::Yellow)), Status::Canceled => "CANCELED".cell().foreground_color(Some(Color::Magenta)), + Status::Aborted => "ABORTED".cell().foreground_color(Some(Color::Blue)), } } @@ -1537,6 +1546,10 @@ fn get_task_time(state: &JobTaskState) -> (Option>, Option (Some(started_data.start_date), Some(*cancelled_date)), JobTaskState::Running { started_data, .. } => (Some(started_data.start_date), None), JobTaskState::Finished { @@ -1553,6 +1566,10 @@ fn get_task_time(state: &JobTaskState) -> (Option>, Option for Verbosity { } } -pub const JOB_SUMMARY_STATUS_ORDER: [Status; 5] = [ +pub const JOB_SUMMARY_STATUS_ORDER: [Status; 6] = [ Status::Running, Status::Waiting, Status::Finished, Status::Failed, + Status::Aborted, Status::Canceled, ]; diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index 8cba07a51..29532c715 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -408,6 +408,7 @@ fn format_job_info(info: &JobInfo) -> Value { "finished": counters.n_finished_tasks, "failed": counters.n_failed_tasks, "canceled": counters.n_canceled_tasks, + "aborted": counters.n_aborted_tasks, "waiting": counters.n_waiting_tasks(*n_tasks) }), "cancel_reason": cancel_reason, @@ -424,6 +425,7 @@ fn format_tasks(tasks: &[(JobTaskId, JobTaskInfo)], map: TaskToPathsMap) -> Valu JobTaskState::Finished { .. } => "finished", JobTaskState::Failed { .. } => "failed", JobTaskState::Canceled { .. } => "canceled", + JobTaskState::Aborted { .. } => "aborted", }; let mut data = json!({ "id": *task_id, diff --git a/crates/hyperqueue/src/client/output/quiet.rs b/crates/hyperqueue/src/client/output/quiet.rs index 4e29a3a4a..f2ee520a0 100644 --- a/crates/hyperqueue/src/client/output/quiet.rs +++ b/crates/hyperqueue/src/client/output/quiet.rs @@ -93,6 +93,7 @@ impl Output for Quiet { Status::Failed => "FAILED", Status::Canceled => "CANCELED", Status::Opened => "OPENED", + Status::Aborted => "ABORTED", }; println!("{status} {count}"); @@ -174,5 +175,6 @@ fn format_status(status: &Status) -> &str { Status::Failed => "FAILED", Status::Canceled => "CANCELED", Status::Opened => "OPENED", + Status::Aborted => "ABORTED", } } diff --git a/crates/hyperqueue/src/client/status.rs b/crates/hyperqueue/src/client/status.rs index 94782e0c4..894f273a5 100644 --- a/crates/hyperqueue/src/client/status.rs +++ b/crates/hyperqueue/src/client/status.rs @@ -11,6 +11,7 @@ pub enum Status { Finished, Failed, Canceled, + Aborted, Opened, } @@ -23,6 +24,8 @@ pub fn job_status(info: &JobInfo) -> Status { Status::Waiting } else if info.counters.n_failed_tasks > 0 { Status::Failed + } else if info.counters.n_aborted_tasks > 0 { + Status::Aborted } else if info.counters.n_canceled_tasks > 0 { Status::Canceled } else { @@ -47,5 +50,6 @@ pub fn get_task_status(status: &JobTaskState) -> Status { JobTaskState::Finished { .. } => Status::Finished, JobTaskState::Failed { .. } => Status::Failed, JobTaskState::Canceled { .. } => Status::Canceled, + JobTaskState::Aborted { .. } => Status::Aborted, } } diff --git a/crates/hyperqueue/src/dashboard/data/timelines/job_timeline.rs b/crates/hyperqueue/src/dashboard/data/timelines/job_timeline.rs index 2f33f88bc..05f522db2 100644 --- a/crates/hyperqueue/src/dashboard/data/timelines/job_timeline.rs +++ b/crates/hyperqueue/src/dashboard/data/timelines/job_timeline.rs @@ -31,6 +31,7 @@ pub enum DashboardTaskState { Running, Finished, Failed, + Aborted, Canceled, } @@ -115,6 +116,16 @@ impl JobTimeline { event.time, ); } + EventPayload::TasksAborted { task_ids } => { + for task_id in task_ids { + update_task_status( + &mut self.job_timeline, + *task_id, + DashboardTaskState::Aborted, + event.time, + ); + } + } EventPayload::TasksCanceled { task_ids } => { for task_id in task_ids { update_task_status( @@ -125,20 +136,21 @@ impl JobTimeline { ); } } - EventPayload::WorkerConnected(_, _) => {} - EventPayload::WorkerLost(_, _) => {} - EventPayload::WorkerOverviewReceived(_) => {} - EventPayload::JobOpen(_, _) => {} - EventPayload::JobClose(_) => {} - EventPayload::JobIdle(_) => {} - EventPayload::AllocationQueueCreated(_, _) => {} - EventPayload::AllocationQueueRemoved(_) => {} - EventPayload::AllocationQueued { .. } => {} - EventPayload::AllocationStarted(_, _) => {} - EventPayload::AllocationFinished(_, _) => {} - EventPayload::ServerStart { .. } => {} - EventPayload::ServerStop => {} - EventPayload::TaskNotify(_) => {} + EventPayload::WorkerConnected(_, _) + | EventPayload::WorkerLost(_, _) + | EventPayload::WorkerOverviewReceived(_) + | EventPayload::JobOpen(_, _) + | EventPayload::JobClose(_) + | EventPayload::JobIdle(_) + | EventPayload::JobCancel { .. } + | EventPayload::AllocationQueueCreated(_, _) + | EventPayload::AllocationQueueRemoved(_) + | EventPayload::AllocationQueued { .. } + | EventPayload::AllocationStarted(_, _) + | EventPayload::AllocationFinished(_, _) + | EventPayload::ServerStart { .. } + | EventPayload::ServerStop + | EventPayload::TaskNotify(_) => {} } } } diff --git a/crates/hyperqueue/src/dashboard/data/timelines/worker_timeline.rs b/crates/hyperqueue/src/dashboard/data/timelines/worker_timeline.rs index a8fde571b..463c1e8a4 100644 --- a/crates/hyperqueue/src/dashboard/data/timelines/worker_timeline.rs +++ b/crates/hyperqueue/src/dashboard/data/timelines/worker_timeline.rs @@ -70,23 +70,25 @@ impl WorkerTimeline { .push(event.time.into(), *overview.clone()); } } - EventPayload::Submit { .. } => {} - EventPayload::JobCompleted(_) => {} - EventPayload::JobOpen(_, _) => {} - EventPayload::JobClose(_) => {} - EventPayload::JobIdle(_) => {} - EventPayload::TaskStarted { .. } => {} - EventPayload::TaskFinished { .. } => {} - EventPayload::TaskFailed { .. } => {} - EventPayload::TasksCanceled { .. } => {} - EventPayload::AllocationQueueCreated(_, _) => {} - EventPayload::AllocationQueueRemoved(_) => {} - EventPayload::AllocationQueued { .. } => {} - EventPayload::AllocationStarted(_, _) => {} - EventPayload::AllocationFinished(_, _) => {} - EventPayload::ServerStart { .. } => {} - EventPayload::ServerStop => {} - EventPayload::TaskNotify(_) => {} + EventPayload::Submit { .. } + | EventPayload::JobCompleted(_) + | EventPayload::JobOpen(_, _) + | EventPayload::JobClose(_) + | EventPayload::JobIdle(_) + | EventPayload::JobCancel { .. } + | EventPayload::TaskStarted { .. } + | EventPayload::TaskFinished { .. } + | EventPayload::TaskFailed { .. } + | EventPayload::TasksAborted { .. } + | EventPayload::TasksCanceled { .. } + | EventPayload::AllocationQueueCreated(_, _) + | EventPayload::AllocationQueueRemoved(_) + | EventPayload::AllocationQueued { .. } + | EventPayload::AllocationStarted(_, _) + | EventPayload::AllocationFinished(_, _) + | EventPayload::ServerStart { .. } + | EventPayload::ServerStop + | EventPayload::TaskNotify(_) => {} } } } diff --git a/crates/hyperqueue/src/dashboard/ui/screens/jobs/job_tasks_chart.rs b/crates/hyperqueue/src/dashboard/ui/screens/jobs/job_tasks_chart.rs index f5294e656..bbda3b188 100644 --- a/crates/hyperqueue/src/dashboard/ui/screens/jobs/job_tasks_chart.rs +++ b/crates/hyperqueue/src/dashboard/ui/screens/jobs/job_tasks_chart.rs @@ -54,22 +54,25 @@ impl JobTaskChart { failed, finished, canceled, + aborted, } = stats.item; let title = format!( - "Running: {running}, Finished: {finished}, Failed: {failed}, Canceled: {canceled}", + "Running: {running}, Finished: {finished}, Failed: {failed}, Canceled: {canceled}, Aborted {aborted}", ); let running = generate_dataset_entries(entries, |stats| stats.running as f64); let finished = generate_dataset_entries(entries, |stats| stats.finished as f64); let failed = generate_dataset_entries(entries, |stats| stats.failed as f64); let canceled = generate_dataset_entries(entries, |stats| stats.canceled as f64); + let aborted = generate_dataset_entries(entries, |stats| stats.aborted as f64); let datasets = vec![ create_dataset(&running, "Running", Color::Yellow), create_dataset(&finished, "Finished", Color::Green), create_dataset(&failed, "Failed", Color::Red), create_dataset(&canceled, "Canceled", Color::Cyan), + create_dataset(&aborted, "Aborted", Color::LightRed), ]; let chart = create_count_chart(datasets, &title, self.range); frame.render_widget(chart, rect); @@ -92,6 +95,7 @@ struct TaskStats { failed: u64, finished: u64, canceled: u64, + aborted: u64, } impl TaskStats { @@ -101,6 +105,7 @@ impl TaskStats { mut failed, mut finished, mut canceled, + mut aborted, } = self; match state { DashboardTaskState::Running => { @@ -115,12 +120,16 @@ impl TaskStats { DashboardTaskState::Canceled => { canceled += 1; } + DashboardTaskState::Aborted => { + aborted += 1; + } } Self { running, failed, finished, canceled, + aborted, } } } diff --git a/crates/hyperqueue/src/dashboard/ui/widgets/tasks_table.rs b/crates/hyperqueue/src/dashboard/ui/widgets/tasks_table.rs index b376a3ffa..6c8c8de6e 100644 --- a/crates/hyperqueue/src/dashboard/ui/widgets/tasks_table.rs +++ b/crates/hyperqueue/src/dashboard/ui/widgets/tasks_table.rs @@ -16,6 +16,7 @@ const RUNNING: &str = "RUNNING"; const FINISHED: &str = "FINISHED"; const FAILED: &str = "FAILED"; const CANCELED: &str = "CANCELED"; +const ABORTED: &str = "ABORTED"; pub struct TasksTable { table: StatefulTable, @@ -102,6 +103,7 @@ impl TasksTable { DashboardTaskState::Finished => FINISHED.to_string(), DashboardTaskState::Failed => FAILED.to_string(), DashboardTaskState::Canceled => CANCELED.to_string(), + DashboardTaskState::Aborted => ABORTED.to_string(), }; cols.extend([ Cell::from(task_state).style(get_task_state_color(task_row.task_state)), @@ -132,6 +134,7 @@ fn create_rows(mut rows: Vec<(JobTaskId, &TaskInfo)>, current_time: SystemTime) DashboardTaskState::Finished => 1, DashboardTaskState::Failed => 2, DashboardTaskState::Canceled => 3, + DashboardTaskState::Aborted => 4, }; match task_info.end_time { None => (status_index, task_info.start_time), @@ -175,6 +178,7 @@ pub fn get_task_state_color(state: DashboardTaskState) -> Style { DashboardTaskState::Finished => Color::Green, DashboardTaskState::Failed => Color::Red, DashboardTaskState::Canceled => Color::Cyan, + DashboardTaskState::Aborted => Color::LightRed, }; Style { diff --git a/crates/hyperqueue/src/server/client/mod.rs b/crates/hyperqueue/src/server/client/mod.rs index c41142d1d..464248400 100644 --- a/crates/hyperqueue/src/server/client/mod.rs +++ b/crates/hyperqueue/src/server/client/mod.rs @@ -430,10 +430,15 @@ fn reconstruct_historical_events( | JobTaskState::Canceled { started_data: Some(started_data), .. + } + | JobTaskState::Aborted { + started_data: Some(started_data), + .. } => Some(started_data.clone()), JobTaskState::Waiting | JobTaskState::Failed { .. } - | JobTaskState::Canceled { .. } => None, + | JobTaskState::Canceled { .. } + | JobTaskState::Aborted { .. } => None, }; if let Some(started_data) = started_data { events.push(Event::at( @@ -487,6 +492,24 @@ fn reconstruct_historical_events( )); } } + JobTaskState::Aborted { cancelled_date, .. } => { + if let Some(task_ids) = events.last_mut().and_then(|e| { + if let EventPayload::TasksAborted { task_ids, .. } = &mut e.payload { + (e.time == *cancelled_date).then_some(task_ids) + } else { + None + } + }) { + task_ids.push(TaskId::new(job.job_id, *id)) + } else { + events.push(Event::at( + *cancelled_date, + EventPayload::TasksAborted { + task_ids: vec![TaskId::new(job.job_id, *id)], + }, + )); + } + } JobTaskState::Waiting | JobTaskState::Running { .. } => {} }; } @@ -754,8 +777,7 @@ async fn cancel_job( .iter() .map(|task_id| task_id.job_task_id()) .collect(); - job.set_cancel_state(task_ids, senders); - job.cancel(reason.clone()); + job.set_cancel_state(reason.clone(), task_ids, senders); CancelJobResponse::Canceled(job_task_ids, already_finished) } else { CancelJobResponse::Canceled(vec![], 0) diff --git a/crates/hyperqueue/src/server/event/journal/prune.rs b/crates/hyperqueue/src/server/event/journal/prune.rs index 0b2941e45..67f83e92b 100644 --- a/crates/hyperqueue/src/server/event/journal/prune.rs +++ b/crates/hyperqueue/src/server/event/journal/prune.rs @@ -22,13 +22,16 @@ pub(crate) fn prune_journal( EventPayload::Submit { job_id, .. } | EventPayload::JobCompleted(job_id) | EventPayload::JobOpen(job_id, _) - | EventPayload::JobClose(job_id) => live_job_ids.contains(job_id).then_some(event), + | EventPayload::JobClose(job_id) + | EventPayload::JobCancel { job_id, .. } => { + live_job_ids.contains(job_id).then_some(event) + } EventPayload::TaskStarted { task_id, .. } | EventPayload::TaskFinished { task_id, .. } | EventPayload::TaskFailed { task_id, .. } => { live_job_ids.contains(&task_id.job_id()).then_some(event) } - EventPayload::TasksCanceled { task_ids, .. } => { + EventPayload::TasksAborted { task_ids } | EventPayload::TasksCanceled { task_ids } => { task_ids.retain(|id| live_job_ids.contains(&id.job_id())); (!task_ids.is_empty()).then_some(event) } diff --git a/crates/hyperqueue/src/server/event/payload.rs b/crates/hyperqueue/src/server/event/payload.rs index 48245883f..fd7fc2b74 100644 --- a/crates/hyperqueue/src/server/event/payload.rs +++ b/crates/hyperqueue/src/server/event/payload.rs @@ -39,6 +39,11 @@ pub enum EventPayload { /// An open job completed all its tasks (but cannot be marked as completed /// because it is open) (EPHEMERAL - not stored in the journal) JobIdle(JobId), + /// Job canceled by user, canceled all tasks in the job + JobCancel { + job_id: JobId, + cancel_reason: String, + }, /// Task has started to execute on some worker TaskStarted { task_id: TaskId, @@ -55,10 +60,14 @@ pub enum EventPayload { task_id: TaskId, error: String, }, - /// Tasks has been canceled; for performance and correctness reason, this even is batched. + /// Tasks has been canceled by user; for performance and correctness reason, this even is batched. TasksCanceled { task_ids: Vec, }, + /// Tasks has been aborted by system; for performance and correctness reason, this even is batched. + TasksAborted { + task_ids: Vec, + }, /// New allocation queue has been created AllocationQueueCreated(QueueId, Box), /// Allocation queue has been removed diff --git a/crates/hyperqueue/src/server/event/streamer.rs b/crates/hyperqueue/src/server/event/streamer.rs index 40f2a8643..ac2c56fd8 100644 --- a/crates/hyperqueue/src/server/event/streamer.rs +++ b/crates/hyperqueue/src/server/event/streamer.rs @@ -90,7 +90,8 @@ impl EventFilter { | EventPayload::JobCompleted(job_id) | EventPayload::JobOpen(job_id, _) | EventPayload::JobClose(job_id) - | EventPayload::JobIdle(job_id) => { + | EventPayload::JobIdle(job_id) + | EventPayload::JobCancel { job_id, .. } => { if !self.flags.contains(EventFilterFlags::JOB_EVENTS) { false } else { @@ -112,7 +113,7 @@ impl EventFilter { .unwrap_or(true) } } - EventPayload::TasksCanceled { task_ids } => { + EventPayload::TasksAborted { task_ids } | EventPayload::TasksCanceled { task_ids } => { if !self.flags.contains(EventFilterFlags::TASK_EVENTS) { false } else { @@ -240,6 +241,18 @@ impl EventStreamer { ); } + #[inline] + pub fn on_job_cancel(&self, job_id: JobId, cancel_reason: String, now: DateTime) { + self.send_event( + EventPayload::JobCancel { + job_id, + cancel_reason, + }, + Some(now), + ForwardMode::StreamAndPersist, + ); + } + #[inline] pub fn on_task_started( &self, @@ -278,6 +291,14 @@ impl EventStreamer { ); } + pub fn on_task_aborted(&self, task_ids: Vec, now: DateTime) { + self.send_event( + EventPayload::TasksAborted { task_ids }, + Some(now), + ForwardMode::StreamAndPersist, + ); + } + #[inline] pub fn on_task_failed(&self, task_id: TaskId, error: String, now: DateTime) { self.send_event( diff --git a/crates/hyperqueue/src/server/job.rs b/crates/hyperqueue/src/server/job.rs index a93d6731c..ec5597661 100644 --- a/crates/hyperqueue/src/server/job.rs +++ b/crates/hyperqueue/src/server/job.rs @@ -43,6 +43,10 @@ pub enum JobTaskState { started_data: Option, cancelled_date: DateTime, }, + Aborted { + started_data: Option, + cancelled_date: DateTime, + }, } impl JobTaskState { @@ -72,6 +76,7 @@ pub struct JobTaskCounters { pub n_finished_tasks: JobTaskCount, pub n_failed_tasks: JobTaskCount, pub n_canceled_tasks: JobTaskCount, + pub n_aborted_tasks: JobTaskCount, } impl std::ops::Add for JobTaskCounters { @@ -83,6 +88,7 @@ impl std::ops::Add for JobTaskCounters { n_finished_tasks: self.n_finished_tasks + rhs.n_finished_tasks, n_failed_tasks: self.n_failed_tasks + rhs.n_failed_tasks, n_canceled_tasks: self.n_canceled_tasks + rhs.n_canceled_tasks, + n_aborted_tasks: self.n_aborted_tasks + rhs.n_aborted_tasks, } } } @@ -94,14 +100,15 @@ impl JobTaskCounters { - self.n_finished_tasks - self.n_failed_tasks - self.n_canceled_tasks + - self.n_aborted_tasks } pub fn has_unsuccessful_tasks(&self) -> bool { - self.n_failed_tasks > 0 || self.n_canceled_tasks > 0 + self.n_failed_tasks > 0 || self.n_canceled_tasks > 0 || self.n_aborted_tasks > 0 } pub fn completed_tasks(&self) -> JobTaskCount { - self.n_finished_tasks + self.n_failed_tasks + self.n_canceled_tasks + self.n_finished_tasks + self.n_failed_tasks + self.n_canceled_tasks + self.n_aborted_tasks } pub fn is_terminated(&self, n_tasks: JobTaskCount) -> bool { @@ -175,10 +182,6 @@ impl Job { senders.events.on_job_closed(self.job_id); } - pub fn cancel(&mut self, reason: Option) { - self.cancel_reason = reason; - } - pub fn max_id(&self) -> Option { self.tasks.keys().max().copied() } @@ -295,7 +298,8 @@ impl Job { } JobTaskState::Finished { .. } | JobTaskState::Failed { .. } - | JobTaskState::Canceled { .. } => None, + | JobTaskState::Canceled { .. } + | JobTaskState::Aborted { .. } => None, }) .collect() } @@ -410,10 +414,16 @@ impl Job { task_id } - pub fn set_cancel_state(&mut self, task_ids: Vec, senders: &Senders) { + pub fn set_cancel_state( + &mut self, + cancel_reason: Option, + task_ids: Vec, + senders: &Senders, + ) { if task_ids.is_empty() { return; } + self.cancel_reason = cancel_reason; let now = Utc::now(); for task_id in &task_ids { assert_eq!(task_id.job_id(), self.job_id); @@ -437,10 +447,46 @@ impl Job { } self.counters.n_canceled_tasks += task_ids.len() as JobTaskCount; + senders.events.on_job_cancel( + self.job_id, + self.cancel_reason.clone().unwrap_or_default(), + now, + ); senders.events.on_task_canceled(task_ids, now); self.check_termination(senders, now); } + pub fn abort_tasks(&mut self, task_ids: Vec, senders: &Senders) { + if task_ids.is_empty() { + return; + } + let now = Utc::now(); + for task_id in &task_ids { + assert_eq!(task_id.job_id(), self.job_id); + let task = self.tasks.get_mut(&task_id.job_task_id()).unwrap(); + match &task.state { + JobTaskState::Running { started_data, .. } => { + task.state = JobTaskState::Aborted { + started_data: Some(started_data.clone()), + cancelled_date: now, + }; + self.counters.n_running_tasks -= 1; + } + JobTaskState::Waiting => { + task.state = JobTaskState::Aborted { + started_data: None, + cancelled_date: now, + }; + } + state => panic!("Invalid job state that is being aborted: {task_id:?} {state:?}"), + } + } + + self.counters.n_aborted_tasks += task_ids.len() as JobTaskCount; + senders.events.on_task_aborted(task_ids, now); + self.check_termination(senders, now); + } + pub fn attach_submit(&mut self, description: SubmittedJobDescription) { match &description.description().task_desc { JobTaskDescription::Array { ids, .. } => { diff --git a/crates/hyperqueue/src/server/restore.rs b/crates/hyperqueue/src/server/restore.rs index 733d7ceb3..2e80aa55c 100644 --- a/crates/hyperqueue/src/server/restore.rs +++ b/crates/hyperqueue/src/server/restore.rs @@ -26,7 +26,8 @@ impl RestorerTaskInfo { JobTaskState::Waiting | JobTaskState::Running { .. } => false, JobTaskState::Finished { .. } | JobTaskState::Failed { .. } - | JobTaskState::Canceled { .. } => true, + | JobTaskState::Canceled { .. } + | JobTaskState::Aborted { .. } => true, } } } @@ -36,6 +37,7 @@ struct RestorerJob { submit_descs: Vec, tasks: Map, is_open: bool, + cancel_reason: String, } pub struct Queue { @@ -97,6 +99,7 @@ impl RestorerJob { JobTaskState::Finished { .. } => job.counters.n_finished_tasks += 1, JobTaskState::Failed { .. } => job.counters.n_failed_tasks += 1, JobTaskState::Canceled { .. } => job.counters.n_canceled_tasks += 1, + JobTaskState::Aborted { .. } => job.counters.n_aborted_tasks += 1, } job_task.state = task.state.clone(); } @@ -114,6 +117,7 @@ impl RestorerJob { submit_descs: Vec::new(), tasks: Map::new(), is_open, + cancel_reason: String::default(), } } @@ -320,7 +324,7 @@ impl StateRestorer { } } EventPayload::TasksCanceled { task_ids } => { - log::debug!("Replaying: TaskCanceled {task_ids:?}"); + log::debug!("Replaying: TasksCanceled {task_ids:?}"); for task_id in task_ids { if let Some(job) = self.jobs.get_mut(&task_id.job_id()) { let task = job.tasks.get_mut(&task_id.job_task_id()); @@ -355,6 +359,42 @@ impl StateRestorer { } } } + EventPayload::TasksAborted { task_ids } => { + log::debug!("Replaying: TasksAborted {task_ids:?}"); + for task_id in task_ids { + if let Some(job) = self.jobs.get_mut(&task_id.job_id()) { + let task = job.tasks.get_mut(&task_id.job_task_id()); + if let Some(task) = task { + task.state = + match std::mem::replace(&mut task.state, JobTaskState::Waiting) + { + JobTaskState::Running { started_data } => { + JobTaskState::Aborted { + started_data: Some(started_data), + cancelled_date: event.time, + } + } + _ => JobTaskState::Aborted { + started_data: None, + cancelled_date: event.time, + }, + } + } else { + job.tasks.insert( + task_id.job_task_id(), + RestorerTaskInfo { + state: JobTaskState::Aborted { + started_data: None, + cancelled_date: event.time, + }, + instance_id: None, + crash_counter: 0, + }, + ); + } + } + } + } EventPayload::AllocationQueueCreated(queue_id, params) => { assert!(self.queues.insert(queue_id, params).is_none()); self.max_queue_id = self.max_queue_id.max(queue_id); @@ -380,6 +420,12 @@ impl StateRestorer { EventPayload::JobClose(job_id) => { self.jobs.get_mut(&job_id).unwrap().is_open = false; } + EventPayload::JobCancel { + job_id, + cancel_reason, + } => { + self.jobs.get_mut(&job_id).unwrap().cancel_reason = cancel_reason; + } EventPayload::TaskNotify(_) | EventPayload::JobIdle(_) => {} } } diff --git a/crates/hyperqueue/src/server/state.rs b/crates/hyperqueue/src/server/state.rs index cf9add088..bdc718ec1 100644 --- a/crates/hyperqueue/src/server/state.rs +++ b/crates/hyperqueue/src/server/state.rs @@ -114,21 +114,21 @@ impl State { &mut self, senders: &Senders, task_id: TaskId, - cancelled_tasks: Vec, + aborted_tasks: Vec, info: TaskFailInfo, ) -> Vec { log::debug!("Task id={task_id} failed: {info:?}"); let job_id = task_id.job_id(); let job = self.get_job_mut(job_id).unwrap(); - if !cancelled_tasks.is_empty() { + if !aborted_tasks.is_empty() { log::debug!( - "Tasks {:?} canceled because of task dependency fails", - &cancelled_tasks + "Tasks {:?} aborted because of task dependency fails", + &aborted_tasks ); } - job.set_cancel_state(cancelled_tasks, senders); + job.abort_tasks(aborted_tasks, senders); job.set_failed_state(task_id.job_task_id(), info.message, senders); if let Some(max_fails) = &job.job_desc.max_fails @@ -136,7 +136,7 @@ impl State { { log::debug!("Max task fails reached for job {}", job.job_id); let task_ids = job.non_finished_task_ids(); - job.set_cancel_state(task_ids.clone(), senders); + job.abort_tasks(task_ids.clone(), senders); return task_ids; } Vec::new() diff --git a/crates/pyhq/src/client/job.rs b/crates/pyhq/src/client/job.rs index 2747f582a..3f9f0ee83 100644 --- a/crates/pyhq/src/client/job.rs +++ b/crates/pyhq/src/client/job.rs @@ -128,7 +128,12 @@ pub fn forget_job_impl(py: Python, ctx: ClientContextPtr, job_id: PyJobId) -> Py run_future(async move { let message = FromClientMessage::ForgetJob(ForgetJobRequest { selector: IdSelector::Specific(IntArray::from_id(job_id)), - filter: vec![Status::Canceled, Status::Failed, Status::Finished], + filter: vec![ + Status::Canceled, + Status::Failed, + Status::Aborted, + Status::Finished, + ], }); let mut ctx = borrow_mut!(py, ctx); diff --git a/docs/cli/output-mode.md b/docs/cli/output-mode.md index d0575e4ba..f916a4831 100644 --- a/docs/cli/output-mode.md +++ b/docs/cli/output-mode.md @@ -146,6 +146,7 @@ Time-based items are formatted in the following way: }, "task_count": 1, "task_stats": { + "aborted": 0, "canceled": 0, "failed": 0, "finished": 1, @@ -174,6 +175,7 @@ Time-based items are formatted in the following way: }, "task_count": 1, "task_stats": { + "aborted": 0, "canceled": 0, "failed": 0, "finished": 1, diff --git a/docs/jobs/failure.md b/docs/jobs/failure.md index 67cc17d69..c09b8af55 100644 --- a/docs/jobs/failure.md +++ b/docs/jobs/failure.md @@ -52,11 +52,11 @@ can instead create a separate stdout/stderr file for each task execution using t By default, when a single task of a [task array](arrays.md) fails, the computation of the job will continue. You can change this behavior with the `--max-fails=` option of the `submit` command, where `X` is non-negative integer. -If specified, once more tasks than `X` tasks fail, the rest of the job's tasks that were not completed yet will be canceled. +If specified, once more tasks than `X` tasks fail, the rest of the job's tasks that were not completed yet will be aborted. For example: ```bash $ hq submit --array 1-1000 --max-fails 5 ... ``` This will create a task array with `1000` tasks. Once `5` or more tasks fail, the remaining uncompleted tasks of the job -will be canceled. +will be aborted. diff --git a/docs/jobs/jobs.md b/docs/jobs/jobs.md index 52b0eba97..84afeb063 100644 --- a/docs/jobs/jobs.md +++ b/docs/jobs/jobs.md @@ -231,20 +231,19 @@ $ hq job info ### Task state -Each task starts in the `Waiting` state and can end up in one of the terminal states: `Finished`, `Failed` -or `Canceled`. +Each task starts in the `Waiting` state and can end up in one of the terminal states: `Finished`, `Failed`, `Canceled` or `Aborted`. ``` -Waiting-----------------\ - | ^ | - | | | - v | | -Running-----------------| - | | | - | \--------\ | - | | | - v v v -Finished Failed Canceled +Waiting----------------------\ + | ^ | + | | | + v | | +Running----------------------| + | | | + | \--------\ | + | | | + v v v +Finished Failed Canceled/Aborted ``` - **Waiting** The task was submitted and is now waiting to be executed. @@ -252,9 +251,10 @@ Finished Failed Canceled crashes. - **Finished** The task has successfully finished. - **Failed** The task has failed. -- **Canceled** The task has been [canceled](#cancelling-jobs). +- **Canceled** The task has been [canceled] (#cancelling-jobs) by user. +- **Aborted** The task has been aborted by HQ due to failure of a dependency or reaching fail limit of a job -If a task is in the `Finished`, `Failed` or `Canceled` state, it is `completed`. +If a task is in the `Finished`, `Failed`, `Canceled` or `Aborted` state, it is `completed`. ### Job state @@ -264,9 +264,14 @@ matches from the following list of rules: 1. If at least one task is `Running`, then job state is `Running`. 2. If at least one task has not been `completed` yet, then job state is `Waiting`. 3. If at least one task is `Failed`, then job state is `Failed`. -4. If at least one task is `Canceled`, then job state is `Canceled`. -5. If all tasks are finished and job is open (see [Open Jobs](openjobs.md)), then job state is `Opened`. -5. Remaining case: all tasks are `Finished` and job is closed, then job state is `Finished`. +4. If at least one task is `Aborted`, then job state is `Aborted`. +5. If at least one task is `Canceled`, then job state is `Canceled`. +6. If all tasks are finished and job is open (see [Open Jobs](openjobs.md)), then job state is `Opened`. +7. Remaining case: all tasks are `Finished` and job is closed, then job state is `Finished`. + +!!! note + + Abortion of a task is closely linked to a failure of a task, this would imply that Aborted Job state should never be reached. ## Cancelling jobs @@ -299,14 +304,14 @@ the [`hq job forget`](cli:hq.job.forget) command[^1]: $ hq job forget ``` -By default, all completed jobs (finished/failed/canceled) will be forgotten. You can use the `--status` parameter to +By default, all completed jobs (finished/failed/aborted/canceled) will be forgotten. You can use the `--status` parameter to only forget jobs in certain statuses: ```console -$ hq job forget all --status finished,canceled +$ hq job forget all --status finished,aborted,canceled ``` -However, only jobs that are completed, i.e. that have been finished successfully, failed or have been canceled, can be +However, only jobs that are completed, i.e. that have been finished successfully, failed, aborted or have been canceled, can be forgotten. If you want to forget a waiting or a running job, [cancel](#cancelling-jobs) it first. Note that if you are using a journal, forgetting only free the memory of the server but the tasks remains @@ -457,6 +462,7 @@ You can display basic job information using [`hq job list`](cli:hq.job.list). - `finished` - `failed` - `canceled` + - `aborted` ### Display a summary table of all jobs diff --git a/docs/jobs/streaming.md b/docs/jobs/streaming.md index 7371c0690..c159a6904 100644 --- a/docs/jobs/streaming.md +++ b/docs/jobs/streaming.md @@ -63,7 +63,7 @@ streaming file. With the following two exceptions: stream file), then the task will fail with an error prefixed with `"Streamer:"` and no streaming guarantees will be upheld. -- When a task is `Canceled` or task fails because of [time limit](jobs.md#time-management) is reached, then the part of +- When a task is `Canceled`, `Aborted` or task fails because of [time limit](jobs.md#time-management) is reached, then the part of its stream that was buffered in the worker is dropped to avoid spending additional resources for this task. ## Inspecting the stream data diff --git a/tests/output/test_json.py b/tests/output/test_json.py index 31ff2020c..dad7df5b6 100644 --- a/tests/output/test_json.py +++ b/tests/output/test_json.py @@ -125,6 +125,7 @@ def test_print_job_list(hq_env: HqEnv): "name": "echo", "task_count": 1, "task_stats": { + "aborted": 0, "canceled": 0, "failed": 0, "finished": 1, @@ -280,6 +281,7 @@ def test_print_job_summary(hq_env: HqEnv): output = parse_json_output(hq_env, ["--output-mode=json", "job", "summary"]) schema = Schema( { + "Aborted": 0, "Canceled": 0, "Failed": 0, "Finished": 0, diff --git a/tests/pyapi/test_dependencies.py b/tests/pyapi/test_dependencies.py index 2c7f62b76..2507a3530 100644 --- a/tests/pyapi/test_dependencies.py +++ b/tests/pyapi/test_dependencies.py @@ -22,7 +22,7 @@ def test_single_dep(hq_env: HqEnv): def test_dep_failed(hq_env: HqEnv): """ - Check that consumers of a failed tasks are canceled + Check that consumers of a failed tasks are aborted """ (job, client) = prepare_job_client(hq_env, with_worker=True) @@ -39,8 +39,8 @@ def test_dep_failed(hq_env: HqEnv): table = hq_env.command(["task", "list", "1"], as_table=True) assert table.get_row_value("0") == "FAILED" - assert table.get_row_value("1") == "CANCELED" - assert table.get_row_value("2") == "CANCELED" + assert table.get_row_value("1") == "ABORTED" + assert table.get_row_value("2") == "ABORTED" assert table.get_row_value("3") == "FINISHED" diff --git a/tests/test_job.py b/tests/test_job.py index 81566ea0f..8398632fe 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -289,7 +289,7 @@ def test_job_list_hidden_jobs(hq_env: HqEnv): def test_job_summary(hq_env: HqEnv): hq_env.start_server() - def check(running=0, waiting=0, finished=0, failed=0, canceled=0): + def check(running=0, waiting=0, finished=0, failed=0, canceled=0, aborted=0): table = hq_env.command(["job", "summary"], as_table=True) items = ( @@ -297,6 +297,7 @@ def check(running=0, waiting=0, finished=0, failed=0, canceled=0): ("WAITING", waiting), ("FINISHED", finished), ("FAILED", failed), + ("ABORTED", aborted), ("CANCELED", canceled), ) for index, (status, count) in enumerate(items): @@ -568,7 +569,7 @@ def test_max_fails_0(hq_env: HqEnv): table = hq_env.command(["job", "info", "1"], as_table=True) states = table[0].get_row_value("State").split("\n") assert "FAILED (1)" in states - assert any(s.startswith("CANCELED") for s in states) + assert any(s.startswith("ABORTED") for s in states) def test_max_fails_1(hq_env: HqEnv): @@ -627,7 +628,7 @@ def test_max_fails_many(hq_env: HqEnv): table = hq_env.command(["job", "info", "1"], as_table=True) states = table[0].get_row_value("State").split("\n") assert "FAILED (4)" in states - assert "CANCELED (6)" in states + assert "ABORTED (6)" in states def test_job_last(hq_env: HqEnv):