Skip to content

Commit 7107593

Browse files
committed
Event for batch canceling tasks
1 parent 0f32fbf commit 7107593

7 files changed

Lines changed: 67 additions & 53 deletions

File tree

crates/hyperqueue/src/client/commands/journal/output.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,9 @@ fn format_payload(event: EventPayload) -> serde_json::Value {
8787
"job": task_id.job_id(),
8888
"task": task_id.job_task_id(),
8989
}),
90-
EventPayload::TaskCanceled { task_id } => json!({
90+
EventPayload::TaskCanceled { task_ids } => json!({
9191
"type": "task-canceled",
92-
"job": task_id.job_id(),
93-
"task": task_id.job_task_id(),
92+
"tasks": task_ids,
9493
}),
9594
EventPayload::TaskFailed { task_id, error } => json!({
9695
"type": "task-failed",

crates/hyperqueue/src/server/client/mod.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -353,12 +353,22 @@ fn reconstruct_historical_events(
353353
));
354354
}
355355
JobTaskState::Canceled { cancelled_date, .. } => {
356-
events.push(Event::at(
357-
*cancelled_date,
358-
EventPayload::TaskCanceled {
359-
task_id: TaskId::new(job.job_id, *id),
360-
},
361-
));
356+
if let Some(task_ids) = events.last_mut().and_then(|e| {
357+
if let EventPayload::TaskCanceled { task_ids, .. } = &mut e.payload {
358+
(e.time == *cancelled_date).then_some(task_ids)
359+
} else {
360+
None
361+
}
362+
}) {
363+
task_ids.push(TaskId::new(job.job_id, *id))
364+
} else {
365+
events.push(Event::at(
366+
*cancelled_date,
367+
EventPayload::TaskCanceled {
368+
task_ids: vec![TaskId::new(job.job_id, *id)],
369+
},
370+
));
371+
}
362372
}
363373
JobTaskState::Waiting | JobTaskState::Running { .. } => {}
364374
};

crates/hyperqueue/src/server/event/journal/prune.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,32 +10,37 @@ pub(crate) fn prune_journal(
1010
live_worker_ids: &Set<WorkerId>,
1111
) -> crate::Result<()> {
1212
for event in reader {
13-
let event = event?;
14-
let retain = match &event.payload {
13+
let mut event = event?;
14+
let event = match &mut event.payload {
1515
EventPayload::WorkerConnected(worker_id, _)
16-
| EventPayload::WorkerLost(worker_id, _) => live_worker_ids.contains(worker_id),
16+
| EventPayload::WorkerLost(worker_id, _) => {
17+
live_worker_ids.contains(worker_id).then_some(event)
18+
}
1719
EventPayload::WorkerOverviewReceived(overview) => {
18-
live_worker_ids.contains(&overview.id)
20+
live_worker_ids.contains(&overview.id).then_some(event)
1921
}
2022
EventPayload::Submit { job_id, .. }
2123
| EventPayload::JobCompleted(job_id)
2224
| EventPayload::JobOpen(job_id, _)
23-
| EventPayload::JobClose(job_id) => live_job_ids.contains(job_id),
25+
| EventPayload::JobClose(job_id) => live_job_ids.contains(job_id).then_some(event),
2426
EventPayload::TaskStarted { task_id, .. }
2527
| EventPayload::TaskFinished { task_id, .. }
26-
| EventPayload::TaskFailed { task_id, .. }
27-
| EventPayload::TaskCanceled { task_id, .. } => {
28-
live_job_ids.contains(&task_id.job_id())
28+
| EventPayload::TaskFailed { task_id, .. } => {
29+
live_job_ids.contains(&task_id.job_id()).then_some(event)
30+
}
31+
EventPayload::TaskCanceled { task_ids, .. } => {
32+
task_ids.retain(|id| live_job_ids.contains(&id.job_id()));
33+
(!task_ids.is_empty()).then_some(event)
2934
}
3035
EventPayload::AllocationQueueCreated(_, _)
3136
| EventPayload::AllocationQueueRemoved(_)
3237
| EventPayload::AllocationQueued { .. }
3338
| EventPayload::AllocationStarted(_, _)
3439
| EventPayload::AllocationFinished(_, _)
3540
| EventPayload::ServerStart { .. }
36-
| EventPayload::ServerStop => true,
41+
| EventPayload::ServerStop => Some(event),
3742
};
38-
if retain {
43+
if let Some(event) = event {
3944
writer.store(event)?;
4045
}
4146
}

crates/hyperqueue/src/server/event/payload.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub enum EventPayload {
4848
},
4949
/// Task has been canceled
5050
TaskCanceled {
51-
task_id: TaskId,
51+
task_ids: Vec<TaskId>,
5252
},
5353
/// New allocation queue has been created
5454
AllocationQueueCreated(QueueId, Box<AllocationQueueParams>),

crates/hyperqueue/src/server/event/streamer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ impl EventStreamer {
136136
);
137137
}
138138

139-
pub fn on_task_canceled(&self, task_id: TaskId, now: DateTime<Utc>) {
139+
pub fn on_task_canceled(&self, task_ids: Vec<TaskId>, now: DateTime<Utc>) {
140140
self.send_event(
141-
EventPayload::TaskCanceled { task_id },
141+
EventPayload::TaskCanceled { task_ids },
142142
Some(now),
143143
ForwardMode::StreamAndPersist,
144144
);

crates/hyperqueue/src/server/job.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -433,10 +433,7 @@ impl Job {
433433
}
434434

435435
self.counters.n_canceled_tasks += task_ids.len() as JobTaskCount;
436-
// TODO: on_task_canceled should take vec
437-
for task_id in task_ids {
438-
senders.events.on_task_canceled(task_id, now);
439-
}
436+
senders.events.on_task_canceled(task_ids, now);
440437
self.check_termination(senders, now);
441438
}
442439

crates/hyperqueue/src/server/restore.rs

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -290,36 +290,39 @@ impl StateRestorer {
290290
}
291291
}
292292
}
293-
EventPayload::TaskCanceled { task_id } => {
294-
log::debug!("Replaying: TaskCanceled {task_id}");
295-
if let Some(job) = self.jobs.get_mut(&task_id.job_id()) {
296-
let task = job.tasks.get_mut(&task_id.job_task_id());
297-
if let Some(task) = task {
298-
task.state =
299-
match std::mem::replace(&mut task.state, JobTaskState::Waiting) {
300-
JobTaskState::Running { started_data } => {
301-
JobTaskState::Canceled {
302-
started_data: Some(started_data),
303-
cancelled_date: event.time,
293+
EventPayload::TaskCanceled { task_ids } => {
294+
log::debug!("Replaying: TaskCanceled {task_ids:?}");
295+
for task_id in task_ids {
296+
if let Some(job) = self.jobs.get_mut(&task_id.job_id()) {
297+
let task = job.tasks.get_mut(&task_id.job_task_id());
298+
if let Some(task) = task {
299+
task.state =
300+
match std::mem::replace(&mut task.state, JobTaskState::Waiting)
301+
{
302+
JobTaskState::Running { started_data } => {
303+
JobTaskState::Canceled {
304+
started_data: Some(started_data),
305+
cancelled_date: event.time,
306+
}
304307
}
308+
_ => JobTaskState::Canceled {
309+
started_data: None,
310+
cancelled_date: event.time,
311+
},
305312
}
306-
_ => JobTaskState::Canceled {
307-
started_data: None,
308-
cancelled_date: event.time,
309-
},
310-
}
311-
} else {
312-
job.tasks.insert(
313-
task_id.job_task_id(),
314-
RestorerTaskInfo {
315-
state: JobTaskState::Canceled {
316-
started_data: None,
317-
cancelled_date: event.time,
313+
} else {
314+
job.tasks.insert(
315+
task_id.job_task_id(),
316+
RestorerTaskInfo {
317+
state: JobTaskState::Canceled {
318+
started_data: None,
319+
cancelled_date: event.time,
320+
},
321+
instance_id: None,
322+
crash_counter: 0,
318323
},
319-
instance_id: None,
320-
crash_counter: 0,
321-
},
322-
);
324+
);
325+
}
323326
}
324327
}
325328
}

0 commit comments

Comments
 (0)