@@ -80,49 +80,58 @@ pub async fn wait_for_jobs_with_progress(
8080 if jobs. iter ( ) . all ( is_terminated) {
8181 log:: warn!( "There are no jobs to wait for" ) ;
8282 } else {
83- let total_tasks: JobTaskCount = jobs
84- . iter ( )
85- . filter ( |info| !is_terminated ( info) )
86- . map ( |info| info. n_tasks )
87- . sum ( ) ;
88- let mut remaining_job_ids: BTreeSet < JobId > = jobs
83+ let mut unfinished_job_ids: BTreeSet < JobId > = jobs
8984 . iter ( )
9085 . filter ( |info| !is_terminated ( info) )
9186 . map ( |info| info. id )
9287 . collect ( ) ;
88+ let unfinished_tasks = jobs
89+ . iter ( )
90+ . filter ( |info| !is_terminated ( info) )
91+ . map ( |info| info. n_tasks )
92+ . sum :: < u32 > ( ) ;
9393
94- let total_jobs = remaining_job_ids . len ( ) ;
94+ let initial_unfinished_jobs = unfinished_job_ids . len ( ) ;
9595 log:: info!(
9696 "Waiting for {} {} with {} {}" ,
97- total_jobs ,
98- pluralize( "job" , total_jobs ) ,
99- total_tasks ,
100- pluralize( "task" , total_tasks as usize ) ,
97+ initial_unfinished_jobs ,
98+ pluralize( "job" , initial_unfinished_jobs ) ,
99+ unfinished_tasks ,
100+ pluralize( "task" , unfinished_tasks as usize ) ,
101101 ) ;
102102
103- let mut counters = JobTaskCounters :: default ( ) ;
103+ let total_tasks: JobTaskCount = jobs. iter ( ) . map ( |info| info. n_tasks ) . sum ( ) ;
104+
105+ // Counters of jobs that have all been finished
106+ // Note: this ignores the fact that some jobs might be open
107+ let mut counters_finished = jobs
108+ . iter ( )
109+ . filter ( |info| is_terminated ( info) )
110+ . map ( |info| info. counters )
111+ . fold ( JobTaskCounters :: default ( ) , |acc, c| acc + c) ;
104112
105113 loop {
114+ // Only ask for status of unfinished jobs
106115 let response = rpc_call ! (
107116 session. connection( ) ,
108117 FromClientMessage :: JobInfo ( JobInfoRequest {
109- selector: IdSelector :: Specific ( IntArray :: from_sorted_ids( remaining_job_ids . iter( ) . map( |x| x. as_num( ) ) ) ) ,
118+ selector: IdSelector :: Specific ( IntArray :: from_sorted_ids( unfinished_job_ids . iter( ) . map( |x| x. as_num( ) ) ) ) ,
110119 } ) ,
111120 ToClientMessage :: JobInfoResponse ( r) => r
112121 )
113122 . await ?;
114123
115- let mut current_counters = counters ;
124+ let mut current_counters = counters_finished ;
116125 for job in & response. jobs {
117126 current_counters = current_counters + job. counters ;
118127
119128 if is_terminated ( job) {
120- remaining_job_ids . remove ( & job. id ) ;
121- counters = counters + job. counters ;
129+ unfinished_job_ids . remove ( & job. id ) ;
130+ counters_finished = counters_finished + job. counters ;
122131 }
123132 }
124133
125- let completed_jobs = total_jobs - remaining_job_ids . len ( ) ;
134+ let completed_jobs = jobs . len ( ) - unfinished_job_ids . len ( ) ;
126135 let completed_tasks = current_counters. n_finished_tasks
127136 + current_counters. n_canceled_tasks
128137 + current_counters. n_failed_tasks ;
@@ -160,25 +169,25 @@ pub async fn wait_for_jobs_with_progress(
160169 "\r \x1b [2K{} {}/{} jobs, {}/{} tasks {}" ,
161170 job_progress_bar( current_counters, total_tasks, 40 ) ,
162171 completed_jobs,
163- total_jobs ,
172+ jobs . len ( ) ,
164173 completed_tasks,
165174 total_tasks,
166175 status
167176 ) ;
168177 std:: io:: stdout ( ) . flush ( ) . unwrap ( ) ;
169178
170- if remaining_job_ids . is_empty ( ) {
179+ if unfinished_job_ids . is_empty ( ) {
171180 // Move the cursor to a new line
172181 println ! ( ) ;
173182 break ;
174183 }
175184 sleep ( Duration :: from_secs ( 1 ) ) . await ;
176185 }
177186
178- if counters . n_failed_tasks > 0 {
187+ if counters_finished . n_failed_tasks > 0 {
179188 anyhow:: bail!( "Some jobs have failed" ) ;
180189 }
181- if counters . n_canceled_tasks > 0 {
190+ if counters_finished . n_canceled_tasks > 0 {
182191 anyhow:: bail!( "Some jobs were canceled" ) ;
183192 }
184193 }
0 commit comments