diff --git a/src/constants.rs b/src/constants.rs index f018bd2..95e2bb4 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -15,3 +15,7 @@ pub const DEFAULT_ANALYSIS_YEARS: u32 = 5; /// Maximum finite value for profit factor when there are no losing trades. /// Avoids `f64::INFINITY` which is not valid JSON. pub const MAX_PROFIT_FACTOR: f64 = 999.99; + +/// Minimum number of return observations required for meaningful +/// block-bootstrap Monte Carlo simulation. +pub const MIN_RETURNS_FOR_BOOTSTRAP: usize = 30; diff --git a/src/server/handlers/mod.rs b/src/server/handlers/mod.rs index be7c8d0..dad3229 100644 --- a/src/server/handlers/mod.rs +++ b/src/server/handlers/mod.rs @@ -3,6 +3,7 @@ pub mod backtests; pub mod chat; pub mod forward_tests; +pub mod pipeline; pub mod profiles; pub mod run_script; pub mod runs; diff --git a/src/server/handlers/pipeline.rs b/src/server/handlers/pipeline.rs new file mode 100644 index 0000000..4ea4962 --- /dev/null +++ b/src/server/handlers/pipeline.rs @@ -0,0 +1,99 @@ +//! REST API handler for the backtest pipeline. +//! +//! Runs the full analysis pipeline +//! (`sweep` -> `significance_gate` -> `walk-forward` -> `oos_data_gate` -> `monte carlo`) +//! and returns a `PipelineResponse` with stage statuses. +//! Monte Carlo may be skipped when earlier gates do not pass. + +use axum::extract::State; +use axum::http::StatusCode; +use axum::Json; +use garde::Validate; +use serde::Deserialize; +use serde_json::Value; +use std::collections::HashMap; + +use crate::server::handlers::sweeps::SweepParamDef; +use crate::server::state::AppState; +use crate::tools::backtest::BacktestToolParams; +use crate::tools::response_types::pipeline::PipelineResponse; + +fn default_mode() -> String { + "grid".to_string() +} + +fn default_objective() -> String { + "sharpe".to_string() +} + +fn default_max_evaluations() -> usize { + 50 +} + +/// Request body for `POST /runs/pipeline`. +#[derive(Debug, Deserialize)] +pub struct CreatePipelineRequest { + pub strategy: String, + #[serde(default = "default_mode")] + pub mode: String, + #[serde(default = "default_objective")] + pub objective: String, + pub params: HashMap, + pub sweep_params: Vec, + #[serde(default = "default_max_evaluations")] + pub max_evaluations: usize, + #[serde(default)] + pub num_permutations: usize, + #[serde(default)] + pub thread_id: Option, +} + +pub(super) fn build_pipeline_params( + req: CreatePipelineRequest, +) -> Result { + if req.sweep_params.is_empty() { + return Err(( + StatusCode::BAD_REQUEST, + "sweep_params must be non-empty for pipeline execution".to_string(), + )); + } + + let params = BacktestToolParams { + strategy: req.strategy, + mode: req.mode, + objective: req.objective, + params: req.params, + sweep_params: req.sweep_params, + max_evaluations: req.max_evaluations, + num_permutations: req.num_permutations, + thread_id: req.thread_id, + pipeline: true, + }; + + params + .validate() + .map_err(|e| (StatusCode::BAD_REQUEST, format!("Validation error: {e}")))?; + + Ok(params) +} + +/// `POST /runs/pipeline` — run the full pipeline synchronously and return the result. +pub async fn create_pipeline( + State(state): State, + Json(req): Json, +) -> Result, (StatusCode, String)> { + let params = build_pipeline_params(req)?; + + let result = crate::tools::backtest::execute(&state.server, params) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + // The pipeline path always returns BacktestToolResponse::Pipeline + match result { + crate::tools::backtest::BacktestToolResponse::Pipeline(response) => Ok(Json(*response)), + _ => Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Pipeline mode did not return a pipeline response".to_string(), + )), + } +} diff --git a/src/server/handlers/tasks.rs b/src/server/handlers/tasks.rs index 6192fd9..e902dba 100644 --- a/src/server/handlers/tasks.rs +++ b/src/server/handlers/tasks.rs @@ -909,3 +909,82 @@ pub async fn stream_task( .keep_alive(KeepAlive::default()), ) } + +// ────────────────────────────────────────────────────────────────────────────── +// Pipeline task +// ────────────────────────────────────────────────────────────────────────────── + +/// `POST /tasks/pipeline` — submit a full pipeline (sweep + walk-forward + monte carlo) as a background task. +pub async fn submit_pipeline( + State(state): State, + Json(req): Json, +) -> Result, (StatusCode, String)> { + let params = super::pipeline::build_pipeline_params(req)?; + + let symbol = params + .params + .get("symbol") + .and_then(Value::as_str) + .unwrap_or("pending") + .to_owned(); + + let params_json = + serde_json::to_value(¶ms.params).unwrap_or(Value::Object(serde_json::Map::default())); + + let task = state.task_manager.register( + TaskKind::Pipeline, + ¶ms.strategy, + &symbol, + params.thread_id.clone(), + params_json, + ); + let task_id = task.id.clone(); + + let tm = Arc::clone(&state.task_manager); + let server = state.server.clone(); + tokio::spawn(async move { + // Wait for permit (or cancellation) + let permit = tokio::select! { + p = tm.acquire_permit() => p, + () = task.cancellation_token.cancelled() => { + tm.mark_cancelled(&task.id); + return; + } + }; + + if task.cancellation_token.is_cancelled() { + tm.mark_cancelled(&task.id); + drop(permit); + return; + } + + tm.mark_running(&task.id); + + let result = crate::tools::backtest::execute(&server, params).await; + + drop(permit); + + if task.cancellation_token.is_cancelled() { + tm.mark_cancelled(&task.id); + return; + } + + match result { + Ok(crate::tools::backtest::BacktestToolResponse::Pipeline(response)) => { + let result_json = serde_json::to_value(&*response).unwrap_or(Value::Null); + tm.mark_completed(&task.id, result_json, response.sweep_id.clone()); + } + Ok(_) => { + tm.mark_failed( + &task.id, + "Pipeline mode did not return a pipeline response".to_string(), + ); + } + Err(e) => { + tm.mark_failed(&task.id, e.to_string()); + } + } + }); + + Ok(Json(SubmitResponse { task_id })) +} diff --git a/src/server/handlers/walk_forward.rs b/src/server/handlers/walk_forward.rs index 9236050..4028788 100644 --- a/src/server/handlers/walk_forward.rs +++ b/src/server/handlers/walk_forward.rs @@ -36,6 +36,8 @@ pub async fn run_walk_forward( params.start_date, params.end_date, params.profile, + None, + None, ) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; diff --git a/src/server/mod.rs b/src/server/mod.rs index 5a39ff0..4c4cb43 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -30,13 +30,12 @@ use crate::tools::response_types::{ AggregatePricesResponse, BenchmarkAnalysisResponse, CointegrationResponse, CorrelateResponse, DistributionResponse, DrawdownAnalysisResponse, FactorAttributionResponse, HypothesisParams, HypothesisResponse, MonteCarloResponse, PortfolioOptimizeResponse, RegimeDetectResponse, - RollingMetricResponse, WalkForwardResponse, + RollingMetricResponse, }; use params::{ tool_err, validation_err, AggregatePricesParams, BenchmarkAnalysisParams, CointegrationParams, CorrelateParams, DistributionParams, DrawdownAnalysisParams, FactorAttributionParams, MonteCarloParams, PortfolioOptimizeParams, RegimeDetectParams, RollingMetricParams, - WalkForwardToolParams, }; use sanitize::SanitizedResult; @@ -589,66 +588,12 @@ impl OptopsyServer { .map_err(|e| format!("Failed to read scripting reference: {e}")) } - /// Run walk-forward optimization: split data into train/test windows, optimize parameters - /// on each training window, and validate on the out-of-sample test window. - /// - /// **When to use**: After finding a profitable strategy via backtest + sweep, to verify - /// that optimized parameters generalize to unseen data. The efficiency ratio (OOS/IS metric) - /// measures how well the strategy avoids overfitting. - /// - /// **Output**: Per-window IS/OOS metrics, best params per window, stitched OOS equity curve, - /// and efficiency ratio. - /// - /// **Example**: - /// ```json - /// { - /// "strategy": "short_put", - /// "symbol": "SPY", - /// "capital": 100000, - /// "params_grid": { - /// "DELTA_TARGET": [0.20, 0.30, 0.40], - /// "DTE_TARGET": [30, 45, 60] - /// }, - /// "n_windows": 5, - /// "objective": "sharpe" - /// } - /// ``` - #[tool(name = "walk_forward", annotations(read_only_hint = true))] - async fn walk_forward( - &self, - Parameters(params): Parameters, - ) -> SanitizedResult { - SanitizedResult( - async { - params - .validate() - .map_err(|e| validation_err("walk_forward", e))?; - tools::walk_forward::execute( - &self.cache, - self.adjustment_store.clone(), - ¶ms.strategy, - ¶ms.symbol, - params.capital, - params.params_grid, - params.objective, - Some(params.n_windows), - params.mode, - Some(params.train_pct), - params.start_date, - params.end_date, - params.profile, - ) - .await - .map_err(tool_err) - } - .await, - ) - } - /// Run a backtest or parameter sweep. Pass a saved strategy by display name. /// /// Omit `sweep_params` for a single backtest (returns full equity curve, trade log, metrics). - /// Provide `sweep_params` for a grid/bayesian sweep (returns ranked results). + /// Provide `sweep_params` for a grid/bayesian sweep. By default, sweeps run the + /// full analysis pipeline: sweep -> significance gate -> walk-forward -> + /// `oos_data_gate` -> monte carlo. Set `pipeline=false` to return sweep-only results. /// Results are persisted to the runs database. /// /// **Example (single backtest)**: @@ -659,7 +604,7 @@ impl OptopsyServer { /// } /// ``` /// - /// **Example (parameter sweep)**: + /// **Example (parameter sweep with full pipeline)**: /// ```json /// { /// "strategy": "short_put", @@ -667,7 +612,8 @@ impl OptopsyServer { /// "sweep_params": [ /// { "name": "DELTA_TARGET", "start": 0.10, "stop": 0.40, "step": 0.05 }, /// { "name": "DTE_TARGET", "param_type": "int", "start": 30, "stop": 60, "step": 5 } - /// ] + /// ], + /// "pipeline": true /// } /// ``` #[tool(name = "backtest", annotations(read_only_hint = false))] @@ -726,7 +672,7 @@ impl ServerHandler for OptopsyServer { \n - factor_attribution — decompose returns into factor exposures\ \n - benchmark_analysis — compare vs. benchmark (alpha, beta, capture ratios)\ \n - distribution — P&L or return distribution + normality tests\ - \n - walk_forward — walk-forward optimization to validate parameter robustness\ + \n - Walk-forward validation runs automatically as part of the backtest pipeline\ \n\ \n### 4. Market Analysis Tools\ \n - aggregate_prices — seasonal/time-bucket return patterns\ diff --git a/src/server/router.rs b/src/server/router.rs index d7a022e..b55fd3c 100644 --- a/src/server/router.rs +++ b/src/server/router.rs @@ -8,7 +8,8 @@ use axum::Router; use tower_http::cors::CorsLayer; use crate::server::handlers::{ - backtests, chat as chat_handlers, forward_tests, profiles, runs, strategies, sweeps, tasks, + backtests, chat as chat_handlers, forward_tests, pipeline, profiles, runs, strategies, sweeps, + tasks, }; use crate::server::state::AppState; @@ -119,6 +120,10 @@ pub fn build_api_router(state: AppState) -> Router { "/walk-forward", axum::routing::post(crate::server::handlers::walk_forward::run_walk_forward), ) + .route( + "/runs/pipeline", + axum::routing::post(pipeline::create_pipeline), + ) .with_state(state.clone()); let task_routes = Router::new() @@ -132,6 +137,10 @@ pub fn build_api_router(state: AppState) -> Router { "/tasks/walk-forward", axum::routing::post(tasks::submit_walk_forward), ) + .route( + "/tasks/pipeline", + axum::routing::post(tasks::submit_pipeline), + ) .route( "/tasks/{id}", axum::routing::get(tasks::get_task).delete(tasks::cancel_task), diff --git a/src/server/task_manager.rs b/src/server/task_manager.rs index fae9f5d..eb57cb3 100644 --- a/src/server/task_manager.rs +++ b/src/server/task_manager.rs @@ -19,6 +19,7 @@ pub enum TaskKind { Single, Sweep, WalkForward, + Pipeline, } #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)] diff --git a/src/tools/backtest.rs b/src/tools/backtest.rs index a77ec6e..1ce6e80 100644 --- a/src/tools/backtest.rs +++ b/src/tools/backtest.rs @@ -17,6 +17,7 @@ use crate::server::handlers::sweeps::{ SweepParamDef, }; use crate::server::OptopsyServer; +use crate::tools::response_types::pipeline::PipelineResponse; use crate::tools::response_types::sweep::SweepResponse; use crate::tools::run_script::RunScriptResponse; @@ -36,6 +37,10 @@ fn default_max_evaluations() -> usize { 50 } +fn default_pipeline() -> bool { + true +} + // ────────────────────────────────────────────────────────────────────────────── // Params // ────────────────────────────────────────────────────────────────────────────── @@ -83,6 +88,14 @@ pub struct BacktestToolParams { #[serde(default)] #[garde(skip)] pub thread_id: Option, + + /// When true, sweeps automatically run the full pipeline: + /// sweep -> significance gate -> walk-forward -> OOS gate -> monte carlo. + /// Default true for sweeps; set `pipeline=false` to return sweep-only results. + /// Has no effect on single backtests (only applies when `sweep_params` is non-empty). + #[serde(default = "default_pipeline")] + #[garde(skip)] + pub pipeline: bool, } // ────────────────────────────────────────────────────────────────────────────── @@ -115,16 +128,19 @@ pub struct SweepBacktestResponse { pub suggested_next_steps: Vec, } -/// Response from the `backtest` tool — either a single run or a sweep. +/// Response from the `backtest` tool — single run, sweep, or full pipeline. #[derive(Serialize, Deserialize, JsonSchema)] #[serde(tag = "type")] pub enum BacktestToolResponse { /// Single backtest result with full equity curve, trade log, and metrics. #[serde(rename = "single")] Single(Box), - /// Parameter sweep result with ranked combinations. + /// Parameter sweep result with ranked combinations (pipeline=false). #[serde(rename = "sweep")] Sweep(Box), + /// Full pipeline: sweep + walk-forward + monte carlo with gate statuses. + #[serde(rename = "pipeline")] + Pipeline(Box), } // ────────────────────────────────────────────────────────────────────────────── @@ -138,8 +154,49 @@ pub async fn execute( ) -> Result { if params.sweep_params.is_empty() { execute_single(server, params).await + } else if params.pipeline { + // Full pipeline: sweep -> walk-forward -> monte carlo + let original_params = params.params.clone(); + let (sweep_id, run_ids, sweep_response, strategy, symbol, capital, objective) = + execute_sweep_raw(server, ¶ms).await?; + + let pipeline_response = crate::tools::pipeline::run_pipeline( + server, + &strategy, + &symbol, + capital, + &objective, + sweep_id, + run_ids, + sweep_response, + original_params, + ) + .await?; + + Ok(BacktestToolResponse::Pipeline(Box::new(pipeline_response))) } else { - execute_sweep(server, params).await + // Sweep-only (no pipeline) + let (sweep_id, run_ids, sweep_response, strategy, symbol, _capital, _objective) = + execute_sweep_raw(server, ¶ms).await?; + + let upper = symbol.to_uppercase(); + let suggested_next_steps = vec![ + format!( + "[NEXT] Re-run backtest(strategy=\"{strategy}\", symbol=\"{upper}\", pipeline=true) to run the full validation pipeline (walk-forward + monte carlo)", + ), + format!("[THEN] Call drawdown_analysis(symbol=\"{upper}\") to analyze drawdown episodes and risk profile"), + format!("[THEN] Call monte_carlo(symbol=\"{upper}\") to simulate forward-looking risk"), + format!("[TIP] Call factor_attribution(symbol=\"{upper}\") to check if alpha is genuine or factor exposure"), + ]; + + Ok(BacktestToolResponse::Sweep(Box::new( + SweepBacktestResponse { + sweep_id, + run_ids, + sweep: sweep_response, + suggested_next_steps, + }, + ))) } } @@ -211,12 +268,25 @@ async fn execute_single( ))) } -/// Run a parameter sweep and persist the results. -#[allow(clippy::too_many_lines)] -async fn execute_sweep( +/// Run a parameter sweep and persist the results, returning raw components. +/// +/// Returns `(sweep_id, run_ids, sweep_response, strategy_key, symbol, capital, objective)`. +#[allow(clippy::too_many_lines, clippy::type_complexity)] +async fn execute_sweep_raw( server: &OptopsyServer, - params: BacktestToolParams, -) -> Result { + params: &BacktestToolParams, +) -> Result< + ( + String, + Vec, + SweepResponse, + String, + String, + f64, + String, + ), + anyhow::Error, +> { let run_store = server .run_store .as_ref() @@ -257,6 +327,12 @@ async fn execute_sweep( }) .ok_or_else(|| anyhow::anyhow!("No symbol resolved — declare an `asset` in the script"))?; + let capital = params + .params + .get("CAPITAL") + .and_then(Value::as_f64) + .unwrap_or(100_000.0); + // 5. Build CreateSweepRequest for the shared helpers let req = CreateSweepRequest { strategy: params.strategy.clone(), @@ -346,25 +422,13 @@ async fn execute_sweep( .map(|detail| detail.runs.iter().map(|r| r.id.clone()).collect()) .unwrap_or_default(); - // 10. Build suggested next steps - let upper = symbol.to_uppercase(); - let suggested_next_steps = vec![ - format!( - "[NEXT] Call walk_forward(strategy=\"{}\", symbol=\"{upper}\", params_grid=) to validate parameter robustness on unseen data", - params.strategy, - ), - format!("[THEN] Call drawdown_analysis(symbol=\"{upper}\") to analyze drawdown episodes and risk profile"), - format!("[THEN] Call monte_carlo(symbol=\"{upper}\") to simulate forward-looking risk"), - format!("[TIP] Call factor_attribution(symbol=\"{upper}\") to check if alpha is genuine or factor exposure"), - ]; - - // 11. Return response - Ok(BacktestToolResponse::Sweep(Box::new( - SweepBacktestResponse { - sweep_id, - run_ids, - sweep: sweep_response, - suggested_next_steps, - }, - ))) + Ok(( + sweep_id, + run_ids, + sweep_response, + strategy_key, + symbol, + capital, + params.objective.clone(), + )) } diff --git a/src/tools/mod.rs b/src/tools/mod.rs index 43c573f..661caf7 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -14,6 +14,7 @@ pub mod forward_test; pub mod hypothesis; pub mod list_symbols; pub mod monte_carlo; +pub mod pipeline; pub mod portfolio_optimize; pub mod raw_prices; pub mod regime_detect; diff --git a/src/tools/monte_carlo.rs b/src/tools/monte_carlo.rs index b762747..d6debcb 100644 --- a/src/tools/monte_carlo.rs +++ b/src/tools/monte_carlo.rs @@ -9,6 +9,7 @@ use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use std::sync::Arc; +use crate::constants::MIN_RETURNS_FOR_BOOTSTRAP; use crate::data::cache::CachedStore; use crate::stats; use crate::tools::ai_helpers; @@ -19,7 +20,7 @@ use crate::tools::response_types::{ /// Block size for bootstrap resampling (trading days). const BLOCK_SIZE: usize = 21; -/// Execute Monte Carlo simulation. +/// Execute Monte Carlo simulation from a symbol's cached price data. #[allow(clippy::too_many_lines, clippy::similar_names)] pub async fn execute( cache: &Arc, @@ -33,7 +34,30 @@ pub async fn execute( let upper = symbol.to_uppercase(); let cutoff_str = ai_helpers::compute_years_cutoff(years); let returns = ai_helpers::load_returns(cache, &upper, &cutoff_str).await?; - if returns.len() < 30 { + execute_from_returns( + &returns, + &upper, + n_simulations, + horizon_days, + initial_capital, + seed, + ) +} + +/// Execute Monte Carlo simulation from pre-computed returns. +/// +/// Used by the pipeline to avoid re-loading data from parquet when returns +/// are already available (e.g., derived from a walk-forward equity curve). +#[allow(clippy::too_many_lines, clippy::similar_names)] +pub fn execute_from_returns( + returns: &[f64], + label: &str, + n_simulations: usize, + horizon_days: usize, + initial_capital: f64, + seed: Option, +) -> Result { + if returns.len() < MIN_RETURNS_FOR_BOOTSTRAP { anyhow::bail!("Insufficient return observations: {}", returns.len()); } @@ -46,7 +70,7 @@ pub async fn execute( let mut terminal_values = Vec::with_capacity(n_simulations); let mut max_drawdowns = Vec::with_capacity(n_simulations); for _ in 0..n_simulations { - let (terminal, max_dd) = simulate_path(&returns, horizon_days, initial_capital, &mut rng); + let (terminal, max_dd) = simulate_path(returns, horizon_days, initial_capital, &mut rng); terminal_values.push(terminal); max_drawdowns.push(max_dd); } @@ -61,12 +85,12 @@ pub async fn execute( let percentile_paths: Vec = percentiles .iter() .zip(labels.iter()) - .map(|(&pct, &label)| { + .map(|(&pct, &lbl)| { let idx = ((pct / 100.0 * terminal_values.len() as f64).floor() as usize) .min(terminal_values.len() - 1); let tv = terminal_values[idx]; MonteCarloPercentilePath { - label: label.to_string(), + label: lbl.to_string(), percentile: pct, terminal_value: tv, total_return_pct: (tv - initial_capital) / initial_capital * 100.0, @@ -139,7 +163,7 @@ pub async fn execute( .map_or(0.0, |p| p.total_return_pct); let summary = format!( - "Monte Carlo simulation for {upper}: {n_simulations} paths over {horizon_days} days. \ + "Monte Carlo simulation for {label}: {n_simulations} paths over {horizon_days} days. \ Median terminal return={median_return:.1}%, P(loss)={:.1}%, \ median max drawdown={:.1}%.", prob_negative * 100.0, @@ -169,10 +193,10 @@ pub async fn execute( let suggested_next_steps = vec![ format!( - "[NEXT] Call drawdown_analysis(symbol=\"{upper}\") to compare simulated vs historical drawdowns" + "[NEXT] Call drawdown_analysis(symbol=\"{label}\") to compare simulated vs historical drawdowns" ), format!( - "[THEN] Call regime_detect(symbol=\"{upper}\") to see if risk varies across market regimes" + "[THEN] Call regime_detect(symbol=\"{label}\") to see if risk varies across market regimes" ), "[TIP] Use the ruin probabilities to size positions — ensure P(ruin) is below your tolerance" .to_string(), @@ -180,7 +204,7 @@ pub async fn execute( Ok(MonteCarloResponse { summary, - symbol: upper, + symbol: label.to_string(), n_simulations, horizon_days, initial_capital, diff --git a/src/tools/pipeline.rs b/src/tools/pipeline.rs new file mode 100644 index 0000000..c71428c --- /dev/null +++ b/src/tools/pipeline.rs @@ -0,0 +1,637 @@ +//! Backtest pipeline orchestrator. +//! +//! When `pipeline=true` on a sweep, this module chains: +//! sweep -> significance gate -> walk-forward -> OOS data gate -> monte carlo. +//! Each stage is fail-tolerant and reports status for frontend rendering. + +use std::collections::HashMap; + +use anyhow::Result; +use serde_json::Value; + +use crate::constants::{MIN_RETURNS_FOR_BOOTSTRAP, P_VALUE_THRESHOLD}; +use crate::server::OptopsyServer; +use crate::tools::response_types::pipeline::{PipelineResponse, StageInfo, StageStatus}; +use crate::tools::response_types::sweep::{SweepResponse, SweepResult}; + +/// Number of top parameter combos to use when building the walk-forward grid. +const TOP_COMBOS_FOR_WF: usize = 3; + +/// Run the full analysis pipeline after a sweep completes. +/// +/// Receives the already-finished sweep result and chains walk-forward validation +/// and Monte Carlo risk analysis, gated by statistical thresholds. +#[allow( + clippy::too_many_arguments, + clippy::too_many_lines, + clippy::implicit_hasher +)] +pub async fn run_pipeline( + server: &OptopsyServer, + strategy: &str, + symbol: &str, + capital: f64, + objective: &str, + sweep_id: String, + run_ids: Vec, + sweep_response: SweepResponse, + base_params: HashMap, +) -> Result { + let pipeline_start = std::time::Instant::now(); + let mut stages: Vec = Vec::new(); + let mut key_findings: Vec = Vec::new(); + + // Stage 1: Sweep (already completed) + stages.push(StageInfo { + name: "sweep".to_string(), + status: StageStatus::Completed, + reason: None, + duration_ms: sweep_response.execution_time_ms, + }); + + // Collect top findings from sweep + if let Some(best) = &sweep_response.best_result { + let (headline_metric, secondary_metric) = + best_sweep_combo_metrics(best, sweep_response.objective.as_str()); + key_findings.push(format!( + "Best sweep combo: {headline_metric}, {secondary_metric}, max DD={:.1}% ({} trades)", + best.max_drawdown * 100.0, + best.trades, + )); + } + + // Gate 1: Significance — decide which combos to validate + let top_combos = select_top_combos(&sweep_response); + + if top_combos.is_empty() { + stages.push(StageInfo { + name: "significance_gate".to_string(), + status: StageStatus::Failed, + reason: Some(format!( + "No parameter combos passed significance gate (all p > {P_VALUE_THRESHOLD:.2} or insufficient trades)" + )), + duration_ms: 0, + }); + + // Skip remaining stages + stages.push(StageInfo { + name: "walk_forward".to_string(), + status: StageStatus::Skipped, + reason: Some("Skipped: significance gate failed".to_string()), + duration_ms: 0, + }); + stages.push(StageInfo { + name: "oos_data_gate".to_string(), + status: StageStatus::Skipped, + reason: Some("Skipped: significance gate failed".to_string()), + duration_ms: 0, + }); + stages.push(StageInfo { + name: "monte_carlo".to_string(), + status: StageStatus::Skipped, + reason: Some("Skipped: significance gate failed".to_string()), + duration_ms: 0, + }); + + key_findings + .push("Pipeline stopped: no statistically significant parameter combos".to_string()); + + return Ok(build_response( + stages, + sweep_id, + run_ids, + sweep_response, + None, + None, + key_findings, + pipeline_start, + symbol, + )); + } + + stages.push(StageInfo { + name: "significance_gate".to_string(), + status: StageStatus::Completed, + reason: None, + duration_ms: 0, + }); + + // Stage 2: Walk-forward validation + let params_grid = build_wf_params_grid(&top_combos); + let wf_start = std::time::Instant::now(); + + // Resolve script source from strategy store so WF doesn't need filesystem access + let raw_source = server.strategy_store.as_ref().and_then(|store| { + store.get_source(strategy).ok().flatten().or_else(|| { + store + .get_source_by_name(strategy) + .ok() + .flatten() + .map(|(_id, src)| src) + }) + }); + let script_source = match raw_source { + Some(raw) => Some(crate::tools::run_script::maybe_transpile(raw)?), + None => None, + }; + + // Ensure base_params has the symbol for walk-forward data loading + let mut wf_base_params = base_params; + if !wf_base_params.contains_key("symbol") { + wf_base_params.insert("symbol".to_string(), Value::String(symbol.to_string())); + } + let wf_base_params = Some(wf_base_params); + + let wf_result = crate::tools::walk_forward::execute( + &server.cache, + server.adjustment_store.clone(), + strategy, + symbol, + capital, + params_grid, + Some(objective.to_string()), + None, // n_windows (default 5) + None, // mode (default rolling) + None, // train_pct (default 0.70) + None, // start_date + None, // end_date + None, // profile + script_source, + wf_base_params, + ) + .await; + + let wf_duration = wf_start.elapsed().as_millis() as u64; + + let wf_response = match wf_result { + Ok(wf) => { + stages.push(StageInfo { + name: "walk_forward".to_string(), + status: StageStatus::Completed, + reason: None, + duration_ms: wf_duration, + }); + + key_findings.push(format!( + "Walk-forward efficiency ratio: {:.2} (OOS Sharpe={:.2}, max DD={:.1}%)", + wf.efficiency_ratio, + wf.stitched_metrics.sharpe, + wf.stitched_metrics.max_drawdown * 100.0, + )); + + Some(wf) + } + Err(e) => { + stages.push(StageInfo { + name: "walk_forward".to_string(), + status: StageStatus::Failed, + reason: Some(format!("Walk-forward failed: {e}")), + duration_ms: wf_duration, + }); + + // Skip remaining stages + stages.push(StageInfo { + name: "oos_data_gate".to_string(), + status: StageStatus::Skipped, + reason: Some("Skipped: walk-forward failed".to_string()), + duration_ms: 0, + }); + stages.push(StageInfo { + name: "monte_carlo".to_string(), + status: StageStatus::Skipped, + reason: Some("Skipped: walk-forward failed".to_string()), + duration_ms: 0, + }); + + key_findings.push(format!("Walk-forward validation failed: {e}")); + + return Ok(build_response( + stages, + sweep_id, + run_ids, + sweep_response, + None, + None, + key_findings, + pipeline_start, + symbol, + )); + } + }; + + // Gate 2: OOS data sufficiency — gate on actual returns count, not equity points, + // since Monte Carlo needs MIN_RETURNS_FOR_BOOTSTRAP returns (equity.len() - 1). + let wf_ref = wf_response.as_ref().unwrap(); + let returns = equity_to_returns(&wf_ref.stitched_equity); + let oos_returns_len = returns.len(); + + if oos_returns_len < MIN_RETURNS_FOR_BOOTSTRAP { + stages.push(StageInfo { + name: "oos_data_gate".to_string(), + status: StageStatus::Failed, + reason: Some(format!( + "Insufficient OOS data: {oos_returns_len} returns < {MIN_RETURNS_FOR_BOOTSTRAP} minimum for bootstrap", + )), + duration_ms: 0, + }); + stages.push(StageInfo { + name: "monte_carlo".to_string(), + status: StageStatus::Skipped, + reason: Some("Skipped: OOS data gate failed".to_string()), + duration_ms: 0, + }); + + key_findings.push(format!( + "Monte Carlo skipped: only {oos_returns_len} OOS returns (need {MIN_RETURNS_FOR_BOOTSTRAP})", + )); + + return Ok(build_response( + stages, + sweep_id, + run_ids, + sweep_response, + wf_response, + None, + key_findings, + pipeline_start, + symbol, + )); + } + + stages.push(StageInfo { + name: "oos_data_gate".to_string(), + status: StageStatus::Completed, + reason: None, + duration_ms: 0, + }); + + // Stage 3: Monte Carlo on OOS equity returns (already computed above) + let initial_capital_oos = wf_ref + .stitched_equity + .first() + .map_or(capital, |ep| ep.equity); + let horizon = returns.len().min(252); // 1 year or available data + let mc_label = symbol.to_uppercase(); + + let mc_start = std::time::Instant::now(); + let mc_result = tokio::task::spawn_blocking(move || { + crate::tools::monte_carlo::execute_from_returns( + &returns, + &mc_label, + 10_000, + horizon, + initial_capital_oos, + Some(42), + ) + }) + .await; + let mc_duration = mc_start.elapsed().as_millis() as u64; + + let mc_response = match mc_result { + Ok(Ok(mc)) => { + stages.push(StageInfo { + name: "monte_carlo".to_string(), + status: StageStatus::Completed, + reason: None, + duration_ms: mc_duration, + }); + + key_findings.push(format!( + "Monte Carlo (OOS): P(loss)={:.1}%, median max DD={:.1}%", + mc.ruin_analysis.prob_negative_return * 100.0, + mc.drawdown_distribution.median, + )); + + Some(mc) + } + Ok(Err(e)) => { + stages.push(StageInfo { + name: "monte_carlo".to_string(), + status: StageStatus::Failed, + reason: Some(format!("Monte Carlo failed: {e}")), + duration_ms: mc_duration, + }); + key_findings.push(format!("Monte Carlo simulation failed: {e}")); + None + } + Err(e) => { + let reason = if e.is_panic() { + format!("Monte Carlo task panicked: {e}") + } else { + format!("Monte Carlo task cancelled: {e}") + }; + stages.push(StageInfo { + name: "monte_carlo".to_string(), + status: StageStatus::Failed, + reason: Some(reason), + duration_ms: mc_duration, + }); + None + } + }; + + Ok(build_response( + stages, + sweep_id, + run_ids, + sweep_response, + wf_response, + mc_response, + key_findings, + pipeline_start, + symbol, + )) +} + +// ───────────────────────────────────────────────────────────────────────────── +// Helpers +// ───────────────────────────────────────────────────────────────────────────── + +/// Select top parameter combos for walk-forward validation. +/// +/// If a permutation test was run, returns combos where `significant == true`. +/// Otherwise, returns the top N combos by objective (already ranked). +fn select_top_combos(sweep: &SweepResponse) -> Vec<&HashMap> { + let has_permutation = sweep.multiple_comparisons.is_some() + || sweep.ranked_results.iter().any(|r| r.p_value.is_some()); + + if has_permutation { + // Return all significant combos (up to TOP_COMBOS_FOR_WF) + sweep + .ranked_results + .iter() + .filter(|r| { + r.significant == Some(true) && r.p_value.is_some_and(|p| p < P_VALUE_THRESHOLD) + }) + .take(TOP_COMBOS_FOR_WF) + .map(|r| &r.params) + .collect() + } else { + // No permutation test — take top combos by objective ranking + sweep + .ranked_results + .iter() + .take(TOP_COMBOS_FOR_WF) + .map(|r| &r.params) + .collect() + } +} + +/// Build a walk-forward `params_grid` from the top sweep combos. +/// +/// For each parameter name, collect the distinct values across the top combos. +/// This gives walk-forward a focused search space around the best-performing region. +fn build_wf_params_grid(combos: &[&HashMap]) -> HashMap> { + let mut grid: HashMap> = HashMap::new(); + + for params in combos { + for (key, value) in *params { + let entry = grid.entry(key.clone()).or_default(); + // Deduplicate values (JSON equality) + if !entry.iter().any(|v| v == value) { + entry.push(value.clone()); + } + } + } + + grid +} + +/// Convert an equity curve to period-over-period returns. +fn equity_to_returns(equity: &[crate::engine::types::EquityPoint]) -> Vec { + equity + .windows(2) + .filter_map(|pair| { + let prev = pair[0].equity; + let curr = pair[1].equity; + if prev.abs() > f64::EPSILON { + Some(curr / prev - 1.0) + } else { + None + } + }) + .collect() +} + +/// Assemble the final `PipelineResponse`. +#[allow(clippy::too_many_arguments)] +fn build_response( + stages: Vec, + sweep_id: String, + run_ids: Vec, + sweep: SweepResponse, + walk_forward: Option, + monte_carlo: Option, + key_findings: Vec, + pipeline_start: std::time::Instant, + symbol: &str, +) -> PipelineResponse { + let completed = stages + .iter() + .filter(|s| matches!(s.status, StageStatus::Completed)) + .count(); + let total = stages.len(); + + let objective = &sweep.objective; + let best_metric = sweep + .best_result + .as_ref() + .map_or(0.0, |r| best_objective_metric(r, objective.as_str()).1); + let summary = format!( + "Pipeline completed: {completed}/{total} stages passed. \ + {} combos tested, best {objective}={best_metric:.2}.", + sweep.combinations_run, + ); + + let upper = symbol.to_uppercase(); + let suggested_next_steps = build_suggested_next_steps(&stages, &upper); + + PipelineResponse { + summary, + stages, + sweep_id, + run_ids, + sweep, + walk_forward, + monte_carlo, + key_findings, + suggested_next_steps, + total_duration_ms: pipeline_start.elapsed().as_millis() as u64, + } +} + +fn best_objective_metric(result: &SweepResult, objective: &str) -> (&'static str, f64) { + match objective { + "sortino" => ("Sortino", result.sortino), + "profit_factor" => ("Profit factor", result.profit_factor), + "cagr" => ("CAGR", result.cagr), + _ => ("Sharpe", result.sharpe), + } +} + +fn best_sweep_combo_metrics(result: &SweepResult, objective: &str) -> (String, String) { + match objective { + "sortino" => ( + format!("Sortino={:.2}", result.sortino), + format!("CAGR={:.1}%", result.cagr * 100.0), + ), + "profit_factor" => ( + format!("Profit factor={:.2}", result.profit_factor), + format!("CAGR={:.1}%", result.cagr * 100.0), + ), + "cagr" => ( + format!("CAGR={:.1}%", result.cagr * 100.0), + format!("Sharpe={:.2}", result.sharpe), + ), + _ => ( + format!("Sharpe={:.2}", result.sharpe), + format!("CAGR={:.1}%", result.cagr * 100.0), + ), + } +} + +/// Build suggested next steps based on which stages completed. +fn build_suggested_next_steps(stages: &[StageInfo], symbol: &str) -> Vec { + let sig_failed = stages + .iter() + .any(|s| s.name == "significance_gate" && matches!(s.status, StageStatus::Failed)); + let wf_failed = stages + .iter() + .any(|s| s.name == "walk_forward" && matches!(s.status, StageStatus::Failed)); + let mc_completed = stages + .iter() + .any(|s| s.name == "monte_carlo" && matches!(s.status, StageStatus::Completed)); + + if sig_failed { + vec![ + "[NEXT] Re-run the sweep with wider parameter ranges or a different strategy".to_string(), + "[TIP] Consider running with num_permutations=0 to skip significance testing and force walk-forward validation".to_string(), + ] + } else if wf_failed { + vec![ + "[NEXT] Check that the strategy and symbol have sufficient data for walk-forward windows".to_string(), + format!("[THEN] Call drawdown_analysis(symbol=\"{symbol}\") to evaluate risk profile"), + ] + } else if mc_completed { + vec![ + format!("[NEXT] Call factor_attribution(symbol=\"{symbol}\") to check if alpha is genuine or factor exposure"), + format!("[THEN] Call benchmark_analysis(symbol=\"{symbol}\") for relative performance metrics"), + format!("[TIP] Call regime_detect(symbol=\"{symbol}\") to see if performance varies across market regimes"), + ] + } else { + // OOS gate failed but WF succeeded + vec![ + format!("[NEXT] Call drawdown_analysis(symbol=\"{symbol}\") to analyze drawdown episodes"), + format!("[THEN] Call factor_attribution(symbol=\"{symbol}\") to decompose returns into factor exposures"), + "[TIP] Monte Carlo was skipped due to insufficient OOS data — consider longer backtest period".to_string(), + ] + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn equity_to_returns_basic() { + use crate::engine::types::EquityPoint; + use chrono::NaiveDateTime; + + let dt = NaiveDateTime::default(); + let equity = vec![ + EquityPoint { + datetime: dt, + equity: 100.0, + unrealized: None, + }, + EquityPoint { + datetime: dt, + equity: 110.0, + unrealized: None, + }, + EquityPoint { + datetime: dt, + equity: 105.0, + unrealized: None, + }, + ]; + let returns = equity_to_returns(&equity); + assert_eq!(returns.len(), 2); + assert!((returns[0] - 0.1).abs() < 1e-10); + assert!((returns[1] - (-5.0 / 110.0)).abs() < 1e-10); + } + + #[test] + fn equity_to_returns_empty() { + let returns = equity_to_returns(&[]); + assert!(returns.is_empty()); + } + + #[test] + fn build_wf_params_grid_deduplicates() { + let mut combo1 = HashMap::new(); + combo1.insert("delta".to_string(), Value::from(0.3)); + combo1.insert("dte".to_string(), Value::from(45)); + + let mut combo2 = HashMap::new(); + combo2.insert("delta".to_string(), Value::from(0.3)); // duplicate + combo2.insert("dte".to_string(), Value::from(30)); + + let top = vec![&combo1, &combo2]; + let grid = build_wf_params_grid(&top); + + assert_eq!(grid["delta"].len(), 1); // deduplicated + assert_eq!(grid["dte"].len(), 2); + } + + #[test] + fn best_objective_metric_uses_requested_objective() { + let result = SweepResult { + rank: 1, + params: HashMap::new(), + sharpe: 1.2, + sortino: 2.4, + pnl: 1000.0, + trades: 10, + win_rate: 0.6, + max_drawdown: 0.1, + profit_factor: 1.8, + cagr: 0.15, + calmar: 1.5, + p_value: None, + significant: None, + }; + + assert_eq!(best_objective_metric(&result, "sortino"), ("Sortino", 2.4)); + assert_eq!( + best_objective_metric(&result, "profit_factor"), + ("Profit factor", 1.8) + ); + assert_eq!(best_objective_metric(&result, "cagr"), ("CAGR", 0.15)); + assert_eq!(best_objective_metric(&result, "sharpe"), ("Sharpe", 1.2)); + } + + #[test] + fn best_sweep_combo_metrics_formats_cagr_without_duplication() { + let result = SweepResult { + rank: 1, + params: HashMap::new(), + sharpe: 1.2, + sortino: 2.4, + pnl: 1000.0, + trades: 10, + win_rate: 0.6, + max_drawdown: 0.1, + profit_factor: 1.8, + cagr: 0.15, + calmar: 1.5, + p_value: None, + significant: None, + }; + + assert_eq!( + best_sweep_combo_metrics(&result, "cagr"), + ("CAGR=15.0%".to_string(), "Sharpe=1.20".to_string()) + ); + } +} diff --git a/src/tools/response_types/mod.rs b/src/tools/response_types/mod.rs index 149a3c1..67546e5 100644 --- a/src/tools/response_types/mod.rs +++ b/src/tools/response_types/mod.rs @@ -5,6 +5,7 @@ pub mod data; pub mod forward_test; pub mod hypothesis; pub mod inputs; +pub mod pipeline; pub mod risk; pub mod stats; pub mod sweep; @@ -15,6 +16,7 @@ pub use data::*; pub use forward_test::*; pub use hypothesis::*; pub use inputs::*; +pub use pipeline::*; pub use risk::*; pub use stats::*; pub use walk_forward::*; diff --git a/src/tools/response_types/pipeline.rs b/src/tools/response_types/pipeline.rs new file mode 100644 index 0000000..2d38511 --- /dev/null +++ b/src/tools/response_types/pipeline.rs @@ -0,0 +1,73 @@ +//! Response types for the backtest pipeline. + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use super::risk::MonteCarloResponse; +use super::sweep::SweepResponse; +use super::walk_forward::WalkForwardResponse; + +/// Status of a pipeline execution stage or decision gate, used by the frontend +/// for rendering. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum StageStatus { + /// Stage ran successfully, or a gate condition was evaluated and passed. + Completed, + /// Stage was not attempted because an earlier gate condition was not met. + Skipped, + /// Stage execution failed, or a gate was evaluated and did not pass. + Failed, +} + +/// A single pipeline stage or gate result. +/// +/// The frontend renders each stage as a card/row colored by status: +/// green (completed/passed), yellow (skipped), red (failed/error or gate not passed). +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct StageInfo { + /// Stage identifier: `"sweep"`, `"significance_gate"`, `"walk_forward"`, + /// `"oos_data_gate"`, `"monte_carlo"`. + pub name: String, + /// Current status of this stage or gate. + pub status: StageStatus, + /// Human-readable explanation when skipped or failed, including execution + /// errors and gate-not-passed outcomes (null when completed). + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option, + /// Wall-clock time in milliseconds (0 when skipped). + pub duration_ms: u64, +} + +/// Full pipeline response. Always contains sweep results; downstream stages +/// are present only if their preceding gates passed. +#[derive(Serialize, Deserialize, JsonSchema)] +pub struct PipelineResponse { + /// Executive summary synthesized from all completed stages. + pub summary: String, + /// Ordered list of stages and gates with statuses for frontend rendering. + pub stages: Vec, + + // -- Sweep (always present) -- + /// Unique sweep ID for referencing the persisted results. + pub sweep_id: String, + /// Run IDs for each individual backtest within the sweep. + pub run_ids: Vec, + /// Sweep results (ranked combos, sensitivity, timing). + pub sweep: SweepResponse, + + // -- Pipeline stages (present only if gates passed) -- + /// Walk-forward validation result. Present when the significance gate passes. + #[serde(skip_serializing_if = "Option::is_none")] + pub walk_forward: Option, + /// Monte Carlo risk simulation on OOS equity. Present when the OOS data gate passes. + #[serde(skip_serializing_if = "Option::is_none")] + pub monte_carlo: Option, + + /// Key findings aggregated across all completed stages. + pub key_findings: Vec, + /// Suggested next analysis steps for the agent. + pub suggested_next_steps: Vec, + /// Total wall-clock time for the entire pipeline in milliseconds. + pub total_duration_ms: u64, +} diff --git a/src/tools/walk_forward.rs b/src/tools/walk_forward.rs index 66ad1b1..a8503e3 100644 --- a/src/tools/walk_forward.rs +++ b/src/tools/walk_forward.rs @@ -36,6 +36,8 @@ pub async fn execute( start_date: Option, end_date: Option, profile: Option, + script_source: Option, + base_params: Option>, ) -> Result { let wf_objective = match objective.as_deref() { Some("sortino") => WfObjective::Sortino, @@ -66,8 +68,8 @@ pub async fn execute( start_date, end_date, profile, - script_source: None, - base_params: None, + script_source, + base_params, }; let obj_str = engine_params.objective.clone(); diff --git a/tests/mcp_server.rs b/tests/mcp_server.rs index acd53c4..c91b0af 100644 --- a/tests/mcp_server.rs +++ b/tests/mcp_server.rs @@ -65,7 +65,7 @@ async fn tool_router_lists_all_tools() { let tools = client.list_all_tools().await.unwrap(); let tool_names: Vec = tools.iter().map(|t| t.name.to_string()).collect(); - assert_eq!(tools.len(), 15, "Expected 15 tools, got: {tool_names:?}"); + assert_eq!(tools.len(), 14, "Expected 14 tools, got: {tool_names:?}"); for expected in [ "backtest", "scripting_guide", @@ -81,7 +81,6 @@ async fn tool_router_lists_all_tools() { "factor_attribution", "portfolio_optimize", "benchmark_analysis", - "walk_forward", ] { assert!( tool_names.contains(&expected.to_string()), diff --git a/tests/pipeline.rs b/tests/pipeline.rs new file mode 100644 index 0000000..a30051f --- /dev/null +++ b/tests/pipeline.rs @@ -0,0 +1,310 @@ +//! Integration tests for the backtest pipeline gate logic and end-to-end flow. +//! +//! Tests cover: +//! - Significance gate failing (all combos non-significant) → downstream skipped +//! - Significance gate passing (no permutation test) → top combos forwarded +//! - OOS data gate failing (too few equity points) → monte carlo skipped +//! - Full pipeline end-to-end with NVDA fixture data + +mod common; + +use std::collections::HashMap; + +use optopsy_mcp::tools::response_types::pipeline::{PipelineResponse, StageStatus}; +use optopsy_mcp::tools::response_types::sweep::{SweepResponse, SweepResult}; + +// ────────────────────────────────────────────────────────────────────────────── +// Helpers +// ────────────────────────────────────────────────────────────────────────────── + +/// Build a minimal `SweepResponse` with given ranked results. +fn make_sweep(ranked: Vec) -> SweepResponse { + let best = ranked.first().cloned(); + let n = ranked.len(); + SweepResponse { + mode: "grid".to_string(), + objective: "sharpe".to_string(), + combinations_total: n, + combinations_run: n, + combinations_failed: 0, + best_result: best, + ranked_results: ranked, + dimension_sensitivity: HashMap::new(), + convergence_trace: None, + execution_time_ms: 100, + multiple_comparisons: None, + full_results: vec![], + } +} + +/// Build a `SweepResult` with given params and significance fields. +fn make_result( + rank: usize, + params: HashMap, + sharpe: f64, + significant: Option, + p_value: Option, +) -> SweepResult { + SweepResult { + rank, + params, + sharpe, + sortino: sharpe * 0.8, + pnl: 5000.0, + trades: 50, + win_rate: 0.55, + max_drawdown: 0.10, + profit_factor: 1.5, + cagr: 0.12, + calmar: 1.2, + p_value, + significant, + } +} + +fn assert_stage(response: &PipelineResponse, name: &str, expected: &StageStatus) { + let stage = response + .stages + .iter() + .find(|s| s.name == name) + .unwrap_or_else(|| panic!("Stage '{name}' not found in pipeline response")); + assert_eq!( + std::mem::discriminant(&stage.status), + std::mem::discriminant(expected), + "Stage '{}': expected {:?}, got {:?} (reason: {:?})", + name, + expected, + stage.status, + stage.reason, + ); +} + +// ────────────────────────────────────────────────────────────────────────────── +// Test: significance gate fails → all downstream stages skipped +// ────────────────────────────────────────────────────────────────────────────── + +#[tokio::test(flavor = "multi_thread")] +async fn significance_gate_fails_skips_downstream() { + let (state, _tmp) = common::test_app_state(); + + // All combos have significant=false and high p-values + let mut params1 = HashMap::new(); + params1.insert("DELTA".to_string(), serde_json::json!(0.3)); + + let mut params2 = HashMap::new(); + params2.insert("DELTA".to_string(), serde_json::json!(0.4)); + + let sweep = make_sweep(vec![ + make_result(1, params1, 0.8, Some(false), Some(0.42)), + make_result(2, params2, 0.5, Some(false), Some(0.78)), + ]); + + let base_params = HashMap::new(); // base params irrelevant — gate fails before WF + + let result = optopsy_mcp::tools::pipeline::run_pipeline( + &state.server, + "test_strategy", + "SPY", + 100_000.0, + "sharpe", + "sweep-001".to_string(), + vec!["run-1".to_string(), "run-2".to_string()], + sweep, + base_params, + ) + .await; + + let response = result.expect("Pipeline should not error even when gate fails"); + + // Verify stages + assert_eq!(response.stages.len(), 5, "Should have 5 stages total"); + assert_stage(&response, "sweep", &StageStatus::Completed); + assert_stage(&response, "significance_gate", &StageStatus::Failed); + assert_stage(&response, "walk_forward", &StageStatus::Skipped); + assert_stage(&response, "oos_data_gate", &StageStatus::Skipped); + assert_stage(&response, "monte_carlo", &StageStatus::Skipped); + + // No walk-forward or monte carlo results + assert!(response.walk_forward.is_none()); + assert!(response.monte_carlo.is_none()); + + // Key findings should mention the failure + assert!( + response + .key_findings + .iter() + .any(|f| f.contains("stopped") || f.contains("significance")), + "Key findings should mention significance gate failure: {:?}", + response.key_findings, + ); +} + +// ────────────────────────────────────────────────────────────────────────────── +// Test: no permutation test → top combos pass significance gate +// ────────────────────────────────────────────────────────────────────────────── + +#[tokio::test(flavor = "multi_thread")] +async fn no_permutation_passes_significance_gate() { + let (state, _tmp) = common::test_app_state(); + + // No permutation test (significant=None, p_value=None) — gate should pass + let mut params1 = HashMap::new(); + params1.insert("DELTA".to_string(), serde_json::json!(0.3)); + + let sweep = make_sweep(vec![make_result(1, params1, 1.2, None, None)]); + + let base_params = HashMap::new(); // WF will fail anyway (nonexistent strategy) + + let result = optopsy_mcp::tools::pipeline::run_pipeline( + &state.server, + "nonexistent_strategy", // walk-forward will fail because strategy doesn't exist + "SPY", + 100_000.0, + "sharpe", + "sweep-002".to_string(), + vec!["run-1".to_string()], + sweep, + base_params, + ) + .await; + + let response = result.expect("Pipeline should not error"); + + // Significance gate should pass (no permutation test → top combos accepted) + assert_stage(&response, "significance_gate", &StageStatus::Completed); + + // Walk-forward will fail because "nonexistent_strategy" doesn't exist in cache + // That's fine — the point is the significance gate passed + assert_stage(&response, "walk_forward", &StageStatus::Failed); + + // Downstream should be skipped after walk-forward failure + assert_stage(&response, "oos_data_gate", &StageStatus::Skipped); + assert_stage(&response, "monte_carlo", &StageStatus::Skipped); +} + +// ────────────────────────────────────────────────────────────────────────────── +// Test: full pipeline with NVDA fixture (significance gate pass → WF → MC) +// ────────────────────────────────────────────────────────────────────────────── + +#[tokio::test(flavor = "multi_thread")] +async fn full_pipeline_with_nvda_fixture() { + let (state, _tmp, strategy_id) = common::test_app_state_with_ohlcv(); + + // SweepResult.params only contains swept combo keys (not base params) + let swept_combo = HashMap::new(); // no swept params — WF uses base_params grid + + let sweep = make_sweep(vec![make_result(1, swept_combo, 1.5, None, None)]); + + // base_params carries the original sweep request params (symbol, CAPITAL, etc.) + let mut base_params = HashMap::new(); + base_params.insert("symbol".to_string(), serde_json::json!("NVDA")); + base_params.insert("CAPITAL".to_string(), serde_json::json!(100_000)); + + let result = optopsy_mcp::tools::pipeline::run_pipeline( + &state.server, + &strategy_id, + "NVDA", + 100_000.0, + "sharpe", + "sweep-003".to_string(), + vec!["run-1".to_string()], + sweep, + base_params, + ) + .await; + + let response = result.expect("Pipeline should complete"); + + // Sweep and significance gate always pass + assert_stage(&response, "sweep", &StageStatus::Completed); + assert_stage(&response, "significance_gate", &StageStatus::Completed); + + // Walk-forward should complete (NVDA data exists in cache) + assert_stage(&response, "walk_forward", &StageStatus::Completed); + assert!( + response.walk_forward.is_some(), + "Walk-forward result should be present" + ); + + // OOS data gate — depends on how many equity points WF produces + let oos_gate = response + .stages + .iter() + .find(|s| s.name == "oos_data_gate") + .unwrap(); + + match oos_gate.status { + StageStatus::Completed => { + // Monte Carlo should have run + assert_stage(&response, "monte_carlo", &StageStatus::Completed); + assert!( + response.monte_carlo.is_some(), + "Monte Carlo result should be present when OOS gate passes" + ); + } + StageStatus::Failed => { + // Not enough OOS data — monte carlo should be skipped + assert_stage(&response, "monte_carlo", &StageStatus::Skipped); + assert!( + response.monte_carlo.is_none(), + "Monte Carlo should be None when OOS gate fails" + ); + } + StageStatus::Skipped => { + panic!("OOS gate should not be skipped when walk-forward completed"); + } + } + + // Structural assertions + assert_eq!(response.sweep_id, "sweep-003"); + assert!(!response.key_findings.is_empty()); + assert!(response.total_duration_ms > 0); +} + +// ────────────────────────────────────────────────────────────────────────────── +// Test: pipeline response structure (sweep_id, run_ids preserved) +// ────────────────────────────────────────────────────────────────────────────── + +#[tokio::test(flavor = "multi_thread")] +async fn pipeline_preserves_sweep_metadata() { + let (state, _tmp) = common::test_app_state(); + + let sweep = make_sweep(vec![make_result( + 1, + HashMap::new(), + 0.5, + Some(false), + Some(0.9), + )]); + + let run_ids = vec![ + "run-aaa".to_string(), + "run-bbb".to_string(), + "run-ccc".to_string(), + ]; + + let base_params = HashMap::new(); // base params irrelevant — gate fails before WF + + let result = optopsy_mcp::tools::pipeline::run_pipeline( + &state.server, + "test", + "SPY", + 50_000.0, + "sharpe", + "sweep-meta-test".to_string(), + run_ids.clone(), + sweep, + base_params, + ) + .await + .expect("Pipeline should not error"); + + // Sweep metadata should be passed through + assert_eq!(result.sweep_id, "sweep-meta-test"); + assert_eq!(result.run_ids, run_ids); + assert_eq!(result.sweep.mode, "grid"); + assert_eq!(result.sweep.objective, "sharpe"); + + // Summary should be non-empty + assert!(!result.summary.is_empty()); +}