Skip to content

Commit e340348

Browse files
michaelchuclaude
andcommitted
feat: pipeline stage progress, duplicate sweep fix, strategy evaluation
- Add stage_label to TaskInfo/TaskSnapshot for pipeline stage tracking - SSE progress events include stage_label (Sweep, Walk-Forward, etc.) - Thread progress/cancel callbacks through pipeline for sweep bar progress - Reset progress to indeterminate when moving past sweep stage - Fix duplicate sweep rows in /runs when multiple runs share max sharpe - submit_pipeline runs StrategyEvaluation (baseline + robustness + verdict) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6791c86 commit e340348

6 files changed

Lines changed: 74 additions & 20 deletions

File tree

src/application/pipeline.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use serde_json::Value;
55
use std::collections::HashMap;
66

77
use crate::application::sweeps;
8+
use crate::scripting::engine::{CancelCallback, ProgressCallback};
89
use crate::server::OptopsyServer;
910
use crate::tools::pipeline::StageCallback;
1011
use crate::tools::response_types::pipeline::PipelineResponse;
@@ -41,15 +42,18 @@ pub async fn execute(
4142
request: &PipelineRequest,
4243
source: &str,
4344
) -> Result<PipelineResponse> {
44-
execute_with_stage(server, request, source, &None).await
45+
execute_with_stage(server, request, source, &None, None, None).await
4546
}
4647

4748
/// Execute the full pipeline with an optional stage progress callback.
49+
#[allow(clippy::too_many_arguments)]
4850
pub async fn execute_with_stage(
4951
server: &OptopsyServer,
5052
request: &PipelineRequest,
5153
source: &str,
5254
on_stage: &StageCallback,
55+
progress: Option<ProgressCallback>,
56+
is_cancelled: Option<&CancelCallback>,
5357
) -> Result<PipelineResponse> {
5458
let run_store = server.require_run_store()?;
5559

@@ -73,8 +77,8 @@ pub async fn execute_with_stage(
7377
&sweep_req,
7478
source,
7579
request.thread_id.as_deref(),
76-
None,
77-
None,
80+
progress,
81+
is_cancelled,
7882
)
7983
.await?;
8084

src/application/workflows.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,39 @@ pub async fn execute(
3434
request: &WorkflowRequest,
3535
source: &str,
3636
) -> Result<WorkflowResponse> {
37-
execute_with_stage(server, request, source, &None).await
37+
execute_with_stage(server, request, source, &None, None, None).await
3838
}
3939

4040
pub async fn execute_with_stage(
4141
server: &OptopsyServer,
4242
request: &WorkflowRequest,
4343
source: &str,
4444
on_stage: &crate::tools::pipeline::StageCallback,
45+
progress: Option<crate::scripting::engine::ProgressCallback>,
46+
is_cancelled: Option<&crate::scripting::engine::CancelCallback>,
4547
) -> Result<WorkflowResponse> {
4648
match request.kind {
4749
WorkflowKind::BaselineValidation => Ok(WorkflowResponse::BaselineValidation(
48-
pipeline::execute_with_stage(server, &request.pipeline, source, on_stage).await?,
50+
pipeline::execute_with_stage(
51+
server,
52+
&request.pipeline,
53+
source,
54+
on_stage,
55+
progress,
56+
is_cancelled,
57+
)
58+
.await?,
4959
)),
5060
WorkflowKind::StrategyEvaluation => Ok(WorkflowResponse::StrategyEvaluation(
51-
execute_strategy_evaluation(server, &request.pipeline, source, on_stage).await?,
61+
execute_strategy_evaluation(
62+
server,
63+
&request.pipeline,
64+
source,
65+
on_stage,
66+
progress,
67+
is_cancelled,
68+
)
69+
.await?,
5270
)),
5371
}
5472
}
@@ -58,14 +76,24 @@ async fn execute_strategy_evaluation(
5876
request: &PipelineRequest,
5977
source: &str,
6078
on_stage: &crate::tools::pipeline::StageCallback,
79+
progress: Option<crate::scripting::engine::ProgressCallback>,
80+
is_cancelled: Option<&crate::scripting::engine::CancelCallback>,
6181
) -> Result<StrategyEvaluationResponse> {
6282
let started_at = Instant::now();
6383
let mut eval_request = request.clone();
6484
if eval_request.num_permutations == 0 {
6585
eval_request.num_permutations = DEFAULT_STRATEGY_EVAL_PERMUTATIONS;
6686
}
6787

68-
let pipeline = pipeline::execute_with_stage(server, &eval_request, source, on_stage).await?;
88+
let pipeline = pipeline::execute_with_stage(
89+
server,
90+
&eval_request,
91+
source,
92+
on_stage,
93+
progress,
94+
is_cancelled,
95+
)
96+
.await?;
6997
let robustness_checks = if pipeline.walk_forward.is_some() {
7098
if let Some(cb) = on_stage {
7199
cb("Robustness Checks");

src/data/run_store.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -347,14 +347,18 @@ impl RunStore for SqliteRunStore {
347347
FROM sweeps sw
348348
LEFT JOIN strategies s ON s.id = sw.strategy_id
349349
LEFT JOIN (
350-
SELECT r1.*
351-
FROM runs r1
352-
INNER JOIN (
353-
SELECT sweep_id, MAX(sharpe) as max_sharpe
354-
FROM runs
355-
WHERE sweep_id IS NOT NULL
356-
GROUP BY sweep_id
357-
) r2 ON r1.sweep_id = r2.sweep_id AND r1.sharpe = r2.max_sharpe
350+
SELECT sweep_id,
351+
MAX(sharpe) as sharpe,
352+
MAX(sortino) as sortino,
353+
MAX(cagr) as cagr,
354+
MAX(profit_factor) as profit_factor,
355+
MAX(total_return) as total_return,
356+
MAX(win_rate) as win_rate,
357+
MIN(max_drawdown) as max_drawdown,
358+
MAX(trade_count) as trade_count
359+
FROM runs
360+
WHERE sweep_id IS NOT NULL
361+
GROUP BY sweep_id
358362
) br ON br.sweep_id = sw.id
359363
LEFT JOIN (
360364
SELECT sweep_id,

src/server/handlers/tasks.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -355,16 +355,31 @@ pub async fn submit_pipeline(
355355
};
356356

357357
// Stage progress callback — updates the task's stage_label
358+
// and resets bar-level progress for non-sweep stages
358359
let task_ref = Arc::clone(&task);
359360
let on_stage: crate::tools::pipeline::StageCallback =
360361
Some(Box::new(move |label: &str| {
361362
*task_ref.stage_label.lock().unwrap() = label.to_string();
363+
if label != "Sweep" {
364+
task_ref.progress_current.store(0, Ordering::Relaxed);
365+
task_ref.progress_total.store(0, Ordering::Relaxed);
366+
}
362367
}));
363368

364-
let result =
365-
workflows::execute_with_stage(&server, &wf_request, "manual", &on_stage)
366-
.await
367-
.map_err(|e| e.to_string())?;
369+
// Progress + cancel callbacks for sweep bar-level progress
370+
let progress = app_tasks::progress_callback(&task);
371+
let is_cancelled = app_tasks::cancel_callback(&task);
372+
373+
let result = workflows::execute_with_stage(
374+
&server,
375+
&wf_request,
376+
"manual",
377+
&on_stage,
378+
Some(progress),
379+
Some(&is_cancelled),
380+
)
381+
.await
382+
.map_err(|e| e.to_string())?;
368383

369384
// Extract sweep_id and persist analysis
370385
let (sweep_id, result_json) = match &result {

src/tools/pipeline.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ const TOP_COMBOS_FOR_WF: usize = 3;
2929
clippy::too_many_lines,
3030
clippy::implicit_hasher
3131
)]
32-
#[allow(clippy::too_many_arguments)]
3332
pub async fn run_pipeline(
3433
server: &OptopsyServer,
3534
strategy: &str,

tests/pipeline.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ async fn significance_gate_fails_skips_downstream() {
111111
vec!["run-1".to_string(), "run-2".to_string()],
112112
sweep,
113113
base_params,
114+
&None,
114115
)
115116
.await;
116117

@@ -165,6 +166,7 @@ async fn no_permutation_passes_significance_gate() {
165166
vec!["run-1".to_string()],
166167
sweep,
167168
base_params,
169+
&None,
168170
)
169171
.await;
170172

@@ -210,6 +212,7 @@ async fn full_pipeline_with_nvda_fixture() {
210212
vec!["run-1".to_string()],
211213
sweep,
212214
base_params,
215+
&None,
213216
)
214217
.await;
215218

@@ -295,6 +298,7 @@ async fn pipeline_preserves_sweep_metadata() {
295298
run_ids.clone(),
296299
sweep,
297300
base_params,
301+
&None,
298302
)
299303
.await
300304
.expect("Pipeline should not error");

0 commit comments

Comments
 (0)