Skip to content

Commit 92dfe41

Browse files
committed
Fix restoring crash counters and instance ids
1 parent b17f7a3 commit 92dfe41

9 files changed

Lines changed: 69 additions & 22 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
### Fixes
1111

1212
* Fixed a problem with journal loading when task dependencies are used
13+
* Fixed restoring crash counters and instance ids from journal
1314

1415
## 0.22.0
1516

crates/hyperqueue/src/server/client/submit.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ fn build_tasks_array(
321321
priority: task_desc.priority,
322322
crash_limit: task_desc.crash_limit,
323323
}],
324-
adjust_instance_id: Default::default(),
324+
adjust_instance_id_and_crash_counters: Default::default(),
325325
}
326326
}
327327

@@ -391,7 +391,7 @@ fn build_tasks_graph(
391391
NewTasksMessage {
392392
tasks: task_configs,
393393
shared_data,
394-
adjust_instance_id: Default::default(),
394+
adjust_instance_id_and_crash_counters: Default::default(),
395395
}
396396
}
397397

crates/hyperqueue/src/server/restore.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ use crate::worker::start::RunningTaskContext;
1010
use crate::{JobId, JobTaskId, Map, make_tako_id, unwrap_tako_id};
1111
use std::path::Path;
1212
use tako::gateway::NewTasksMessage;
13-
use tako::{ItemId, WorkerId};
13+
use tako::{InstanceId, ItemId, WorkerId};
1414

1515
struct RestorerTaskInfo {
1616
state: JobTaskState,
17+
instance_id: Option<InstanceId>,
18+
crash_counter: u32,
1719
}
1820

1921
impl RestorerTaskInfo {
@@ -77,15 +79,17 @@ impl RestorerJob {
7779

7880
for (task_id, job_task) in job.tasks.iter_mut() {
7981
if let Some(task) = self.tasks.get_mut(task_id) {
82+
if task.crash_counter > 0 || task.instance_id.is_some() {
83+
new_tasks.adjust_instance_id_and_crash_counters.insert(
84+
make_tako_id(job_id, *task_id),
85+
(
86+
task.instance_id.map(|x| x.as_num() + 1).unwrap_or(0).into(),
87+
task.crash_counter,
88+
),
89+
);
90+
}
8091
match &task.state {
81-
JobTaskState::Waiting => continue,
82-
JobTaskState::Running { started_data } => {
83-
let instance_id = started_data.context.instance_id.as_num() + 1;
84-
new_tasks
85-
.adjust_instance_id
86-
.insert(make_tako_id(job_id, *task_id), instance_id.into());
87-
continue;
88-
}
92+
JobTaskState::Waiting | JobTaskState::Running { .. } => continue,
8993
JobTaskState::Finished { .. } => job.counters.n_finished_tasks += 1,
9094
JobTaskState::Failed { .. } => job.counters.n_failed_tasks += 1,
9195
JobTaskState::Canceled { .. } => job.counters.n_canceled_tasks += 1,
@@ -112,6 +116,19 @@ impl RestorerJob {
112116
pub fn add_submit(&mut self, submit: SubmittedJobDescription) {
113117
self.submit_descs.push(submit)
114118
}
119+
120+
pub fn increase_crash_counters(&mut self, worker_id: WorkerId) {
121+
for task in self.tasks.values_mut() {
122+
match &task.state {
123+
JobTaskState::Running { started_data }
124+
if started_data.worker_ids.contains(&worker_id) =>
125+
{
126+
task.crash_counter += 1;
127+
}
128+
_ => {}
129+
}
130+
}
131+
}
115132
}
116133

117134
#[derive(Default)]
@@ -183,7 +200,13 @@ impl StateRestorer {
183200
log::debug!("Replaying: WorkerConnected {worker_id}");
184201
self.max_worker_id = self.max_worker_id.max(worker_id.as_num());
185202
}
186-
EventPayload::WorkerLost(_, _) => {}
203+
EventPayload::WorkerLost(worker_id, reason) => {
204+
if reason.is_failure() {
205+
for job in self.jobs.values_mut() {
206+
job.increase_crash_counters(worker_id);
207+
}
208+
}
209+
}
187210
EventPayload::WorkerOverviewReceived(_) => {}
188211
EventPayload::Submit {
189212
job_id,
@@ -232,6 +255,8 @@ impl StateRestorer {
232255
worker_ids: workers,
233256
},
234257
},
258+
instance_id: Some(instance_id),
259+
crash_counter: 0,
235260
},
236261
);
237262
}
@@ -300,6 +325,8 @@ impl StateRestorer {
300325
started_data: None,
301326
cancelled_date: event.time,
302327
},
328+
instance_id: None,
329+
crash_counter: 0,
303330
},
304331
);
305332
}

crates/tako/src/gateway.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ pub struct TaskConfiguration {
129129
pub struct NewTasksMessage {
130130
pub tasks: Vec<TaskConfiguration>,
131131
pub shared_data: Vec<SharedTaskConfiguration>,
132-
pub adjust_instance_id: Map<TaskId, InstanceId>,
132+
pub adjust_instance_id_and_crash_counters: Map<TaskId, (InstanceId, u32)>,
133133
}
134134

135135
#[derive(Deserialize, Serialize, Debug)]

crates/tako/src/internal/server/client.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,13 @@ fn handle_new_tasks(
218218
let task = Task::new(task.id, task.task_deps, conf.clone(), task.body);
219219
tasks.push(task);
220220
}
221-
if !msg.adjust_instance_id.is_empty() {
221+
if !msg.adjust_instance_id_and_crash_counters.is_empty() {
222222
for task in &mut tasks {
223-
if let Some(instance_id) = msg.adjust_instance_id.get(&task.id) {
223+
if let Some((instance_id, crash_counter)) =
224+
msg.adjust_instance_id_and_crash_counters.get(&task.id)
225+
{
224226
task.instance_id = *instance_id;
227+
task.crash_counter = *crash_counter;
225228
}
226229
}
227230
}

crates/tako/src/internal/server/reactor.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,11 @@ pub(crate) fn on_remove_worker(
135135

136136
comm.broadcast_worker_message(&ToWorkerMessage::LostWorker(worker_id));
137137

138-
// IMPORTANT: We have to announce error BEFORE we announce lost worker (+ running tasks)
139-
// because HQ does not recognize switch from waiting to failed stated.
138+
// IMPORTANT: We need to announce lost worker before failing the jobs
139+
// so in journal restoration we can detect what tasks were running
140+
// without explicit logging
141+
comm.send_client_worker_lost(worker_id, running_tasks, reason);
142+
140143
for task_id in crashed_tasks {
141144
let count = core.get_task(task_id).crash_counter;
142145
log::debug!("Task {} reached crash limit {}", task_id, count);
@@ -154,9 +157,6 @@ pub(crate) fn on_remove_worker(
154157
},
155158
);
156159
}
157-
158-
comm.send_client_worker_lost(worker_id, running_tasks, reason);
159-
160160
comm.ask_for_scheduling();
161161
}
162162

crates/tako/src/internal/tests/integration/utils/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl ServerHandle {
194194
let msg = NewTasksMessage {
195195
tasks,
196196
shared_data: configurations,
197-
adjust_instance_id: Default::default(),
197+
adjust_instance_id_and_crash_counters: Default::default(),
198198
};
199199
self.send(FromGatewayMessage::NewTasks(msg)).await;
200200
wait_for_msg!(self, ToGatewayMessage::NewTasksResponse(NewTasksResponse { .. }) => ());

tests/test_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -989,7 +989,7 @@ def test_zero_custom_error_message(hq_env: HqEnv):
989989
# print(table)
990990

991991

992-
@pytest.mark.parametrize("count", [None, 1, 7])
992+
@pytest.mark.parametrize("count", [None, 1, 2, 7])
993993
def test_crashing_job_status_default(count: Optional[int], hq_env: HqEnv):
994994
hq_env.start_server()
995995

tests/test_journal.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,3 +409,19 @@ def test_restore_dependencies2(hq_env: HqEnv, tmp_path):
409409
hq_env.start_server(args=["--journal", journal_path])
410410
hq_env.start_worker(args=["--resource", "x=sum(2)"])
411411
wait_for_job_state(hq_env, 1, "FINISHED")
412+
413+
414+
def test_restore_crash_counters(hq_env: HqEnv, tmp_path):
415+
journal_path = os.path.join(tmp_path, "my.journal")
416+
hq_env.start_server(args=["--journal", journal_path])
417+
hq_env.start_worker()
418+
hq_env.command(["submit", "--crash-limit=2", "--", "sleep", "4"])
419+
wait_for_job_state(hq_env, 1, "RUNNING")
420+
hq_env.kill_worker(1)
421+
wait_for_job_state(hq_env, 1, "WAITING")
422+
hq_env.stop_server()
423+
hq_env.start_server(args=["--journal", journal_path])
424+
hq_env.start_worker()
425+
wait_for_job_state(hq_env, 1, "RUNNING")
426+
hq_env.kill_worker(3)
427+
wait_for_job_state(hq_env, 1, "FAILED")

0 commit comments

Comments
 (0)