File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -87,7 +87,7 @@ fn format_payload(event: EventPayload) -> serde_json::Value {
8787 "job" : task_id. job_id( ) ,
8888 "task" : task_id. job_task_id( ) ,
8989 } ) ,
90- EventPayload :: TaskCanceled { task_ids } => json ! ( {
90+ EventPayload :: TasksCanceled { task_ids } => json ! ( {
9191 "type" : "task-canceled" ,
9292 "tasks" : task_ids,
9393 } ) ,
Original file line number Diff line number Diff line change @@ -354,7 +354,7 @@ fn reconstruct_historical_events(
354354 }
355355 JobTaskState :: Canceled { cancelled_date, .. } => {
356356 if let Some ( task_ids) = events. last_mut ( ) . and_then ( |e| {
357- if let EventPayload :: TaskCanceled { task_ids, .. } = & mut e. payload {
357+ if let EventPayload :: TasksCanceled { task_ids, .. } = & mut e. payload {
358358 ( e. time == * cancelled_date) . then_some ( task_ids)
359359 } else {
360360 None
@@ -364,7 +364,7 @@ fn reconstruct_historical_events(
364364 } else {
365365 events. push ( Event :: at (
366366 * cancelled_date,
367- EventPayload :: TaskCanceled {
367+ EventPayload :: TasksCanceled {
368368 task_ids : vec ! [ TaskId :: new( job. job_id, * id) ] ,
369369 } ,
370370 ) ) ;
Original file line number Diff line number Diff line change @@ -28,7 +28,7 @@ pub(crate) fn prune_journal(
2828 | EventPayload :: TaskFailed { task_id, .. } => {
2929 live_job_ids. contains ( & task_id. job_id ( ) ) . then_some ( event)
3030 }
31- EventPayload :: TaskCanceled { task_ids, .. } => {
31+ EventPayload :: TasksCanceled { task_ids, .. } => {
3232 task_ids. retain ( |id| live_job_ids. contains ( & id. job_id ( ) ) ) ;
3333 ( !task_ids. is_empty ( ) ) . then_some ( event)
3434 }
Original file line number Diff line number Diff line change @@ -46,8 +46,8 @@ pub enum EventPayload {
4646 task_id : TaskId ,
4747 error : String ,
4848 } ,
49- /// Task has been canceled
50- TaskCanceled {
49+ /// Tasks has been canceled; for performance and correctness reason, this even is batched.
50+ TasksCanceled {
5151 task_ids : Vec < TaskId > ,
5252 } ,
5353 /// New allocation queue has been created
Original file line number Diff line number Diff line change @@ -138,7 +138,7 @@ impl EventStreamer {
138138
139139 pub fn on_task_canceled ( & self , task_ids : Vec < TaskId > , now : DateTime < Utc > ) {
140140 self . send_event (
141- EventPayload :: TaskCanceled { task_ids } ,
141+ EventPayload :: TasksCanceled { task_ids } ,
142142 Some ( now) ,
143143 ForwardMode :: StreamAndPersist ,
144144 ) ;
Original file line number Diff line number Diff line change @@ -290,7 +290,7 @@ impl StateRestorer {
290290 }
291291 }
292292 }
293- EventPayload :: TaskCanceled { task_ids } => {
293+ EventPayload :: TasksCanceled { task_ids } => {
294294 log:: debug!( "Replaying: TaskCanceled {task_ids:?}" ) ;
295295 for task_id in task_ids {
296296 if let Some ( job) = self . jobs . get_mut ( & task_id. job_id ( ) ) {
You can’t perform that action at this time.
0 commit comments