Skip to content

Commit 2e199ef

Browse files
michaelchuclaudeCopilot
authored
feat: add orchestrated backtest pipeline (sweep -> walk-forward -> monte carlo) (#177)
Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 6784190 commit 2e199ef

17 files changed

Lines changed: 1358 additions & 105 deletions

File tree

src/constants.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,7 @@ pub const DEFAULT_ANALYSIS_YEARS: u32 = 5;
1515
/// Maximum finite value for profit factor when there are no losing trades.
1616
/// Avoids `f64::INFINITY` which is not valid JSON.
1717
pub const MAX_PROFIT_FACTOR: f64 = 999.99;
18+
19+
/// Minimum number of return observations required for meaningful
20+
/// block-bootstrap Monte Carlo simulation.
21+
pub const MIN_RETURNS_FOR_BOOTSTRAP: usize = 30;

src/server/handlers/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
pub mod backtests;
44
pub mod chat;
55
pub mod forward_tests;
6+
pub mod pipeline;
67
pub mod profiles;
78
pub mod run_script;
89
pub mod runs;

src/server/handlers/pipeline.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
//! REST API handler for the backtest pipeline.
2+
//!
3+
//! Runs the full analysis pipeline
4+
//! (`sweep` -> `significance_gate` -> `walk-forward` -> `oos_data_gate` -> `monte carlo`)
5+
//! and returns a `PipelineResponse` with stage statuses.
6+
//! Monte Carlo may be skipped when earlier gates do not pass.
7+
8+
use axum::extract::State;
9+
use axum::http::StatusCode;
10+
use axum::Json;
11+
use garde::Validate;
12+
use serde::Deserialize;
13+
use serde_json::Value;
14+
use std::collections::HashMap;
15+
16+
use crate::server::handlers::sweeps::SweepParamDef;
17+
use crate::server::state::AppState;
18+
use crate::tools::backtest::BacktestToolParams;
19+
use crate::tools::response_types::pipeline::PipelineResponse;
20+
21+
fn default_mode() -> String {
22+
"grid".to_string()
23+
}
24+
25+
fn default_objective() -> String {
26+
"sharpe".to_string()
27+
}
28+
29+
fn default_max_evaluations() -> usize {
30+
50
31+
}
32+
33+
/// Request body for `POST /runs/pipeline`.
34+
#[derive(Debug, Deserialize)]
35+
pub struct CreatePipelineRequest {
36+
pub strategy: String,
37+
#[serde(default = "default_mode")]
38+
pub mode: String,
39+
#[serde(default = "default_objective")]
40+
pub objective: String,
41+
pub params: HashMap<String, Value>,
42+
pub sweep_params: Vec<SweepParamDef>,
43+
#[serde(default = "default_max_evaluations")]
44+
pub max_evaluations: usize,
45+
#[serde(default)]
46+
pub num_permutations: usize,
47+
#[serde(default)]
48+
pub thread_id: Option<String>,
49+
}
50+
51+
pub(super) fn build_pipeline_params(
52+
req: CreatePipelineRequest,
53+
) -> Result<BacktestToolParams, (StatusCode, String)> {
54+
if req.sweep_params.is_empty() {
55+
return Err((
56+
StatusCode::BAD_REQUEST,
57+
"sweep_params must be non-empty for pipeline execution".to_string(),
58+
));
59+
}
60+
61+
let params = BacktestToolParams {
62+
strategy: req.strategy,
63+
mode: req.mode,
64+
objective: req.objective,
65+
params: req.params,
66+
sweep_params: req.sweep_params,
67+
max_evaluations: req.max_evaluations,
68+
num_permutations: req.num_permutations,
69+
thread_id: req.thread_id,
70+
pipeline: true,
71+
};
72+
73+
params
74+
.validate()
75+
.map_err(|e| (StatusCode::BAD_REQUEST, format!("Validation error: {e}")))?;
76+
77+
Ok(params)
78+
}
79+
80+
/// `POST /runs/pipeline` — run the full pipeline synchronously and return the result.
81+
pub async fn create_pipeline(
82+
State(state): State<AppState>,
83+
Json(req): Json<CreatePipelineRequest>,
84+
) -> Result<Json<PipelineResponse>, (StatusCode, String)> {
85+
let params = build_pipeline_params(req)?;
86+
87+
let result = crate::tools::backtest::execute(&state.server, params)
88+
.await
89+
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
90+
91+
// The pipeline path always returns BacktestToolResponse::Pipeline
92+
match result {
93+
crate::tools::backtest::BacktestToolResponse::Pipeline(response) => Ok(Json(*response)),
94+
_ => Err((
95+
StatusCode::INTERNAL_SERVER_ERROR,
96+
"Pipeline mode did not return a pipeline response".to_string(),
97+
)),
98+
}
99+
}

src/server/handlers/tasks.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -909,3 +909,82 @@ pub async fn stream_task(
909909
.keep_alive(KeepAlive::default()),
910910
)
911911
}
912+
913+
// ──────────────────────────────────────────────────────────────────────────────
914+
// Pipeline task
915+
// ──────────────────────────────────────────────────────────────────────────────
916+
917+
/// `POST /tasks/pipeline` — submit a full pipeline (sweep + walk-forward + monte carlo) as a background task.
918+
pub async fn submit_pipeline(
919+
State(state): State<AppState>,
920+
Json(req): Json<super::pipeline::CreatePipelineRequest>,
921+
) -> Result<Json<SubmitResponse>, (StatusCode, String)> {
922+
let params = super::pipeline::build_pipeline_params(req)?;
923+
924+
let symbol = params
925+
.params
926+
.get("symbol")
927+
.and_then(Value::as_str)
928+
.unwrap_or("pending")
929+
.to_owned();
930+
931+
let params_json =
932+
serde_json::to_value(&params.params).unwrap_or(Value::Object(serde_json::Map::default()));
933+
934+
let task = state.task_manager.register(
935+
TaskKind::Pipeline,
936+
&params.strategy,
937+
&symbol,
938+
params.thread_id.clone(),
939+
params_json,
940+
);
941+
let task_id = task.id.clone();
942+
943+
let tm = Arc::clone(&state.task_manager);
944+
let server = state.server.clone();
945+
tokio::spawn(async move {
946+
// Wait for permit (or cancellation)
947+
let permit = tokio::select! {
948+
p = tm.acquire_permit() => p,
949+
() = task.cancellation_token.cancelled() => {
950+
tm.mark_cancelled(&task.id);
951+
return;
952+
}
953+
};
954+
955+
if task.cancellation_token.is_cancelled() {
956+
tm.mark_cancelled(&task.id);
957+
drop(permit);
958+
return;
959+
}
960+
961+
tm.mark_running(&task.id);
962+
963+
let result = crate::tools::backtest::execute(&server, params).await;
964+
965+
drop(permit);
966+
967+
if task.cancellation_token.is_cancelled() {
968+
tm.mark_cancelled(&task.id);
969+
return;
970+
}
971+
972+
match result {
973+
Ok(crate::tools::backtest::BacktestToolResponse::Pipeline(response)) => {
974+
let result_json = serde_json::to_value(&*response).unwrap_or(Value::Null);
975+
tm.mark_completed(&task.id, result_json, response.sweep_id.clone());
976+
}
977+
Ok(_) => {
978+
tm.mark_failed(
979+
&task.id,
980+
"Pipeline mode did not return a pipeline response".to_string(),
981+
);
982+
}
983+
Err(e) => {
984+
tm.mark_failed(&task.id, e.to_string());
985+
}
986+
}
987+
});
988+
989+
Ok(Json(SubmitResponse { task_id }))
990+
}

src/server/handlers/walk_forward.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ pub async fn run_walk_forward(
3636
params.start_date,
3737
params.end_date,
3838
params.profile,
39+
None,
40+
None,
3941
)
4042
.await
4143
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;

src/server/mod.rs

Lines changed: 8 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,12 @@ use crate::tools::response_types::{
3030
AggregatePricesResponse, BenchmarkAnalysisResponse, CointegrationResponse, CorrelateResponse,
3131
DistributionResponse, DrawdownAnalysisResponse, FactorAttributionResponse, HypothesisParams,
3232
HypothesisResponse, MonteCarloResponse, PortfolioOptimizeResponse, RegimeDetectResponse,
33-
RollingMetricResponse, WalkForwardResponse,
33+
RollingMetricResponse,
3434
};
3535
use params::{
3636
tool_err, validation_err, AggregatePricesParams, BenchmarkAnalysisParams, CointegrationParams,
3737
CorrelateParams, DistributionParams, DrawdownAnalysisParams, FactorAttributionParams,
3838
MonteCarloParams, PortfolioOptimizeParams, RegimeDetectParams, RollingMetricParams,
39-
WalkForwardToolParams,
4039
};
4140
use sanitize::SanitizedResult;
4241

@@ -589,66 +588,12 @@ impl OptopsyServer {
589588
.map_err(|e| format!("Failed to read scripting reference: {e}"))
590589
}
591590

592-
/// Run walk-forward optimization: split data into train/test windows, optimize parameters
593-
/// on each training window, and validate on the out-of-sample test window.
594-
///
595-
/// **When to use**: After finding a profitable strategy via backtest + sweep, to verify
596-
/// that optimized parameters generalize to unseen data. The efficiency ratio (OOS/IS metric)
597-
/// measures how well the strategy avoids overfitting.
598-
///
599-
/// **Output**: Per-window IS/OOS metrics, best params per window, stitched OOS equity curve,
600-
/// and efficiency ratio.
601-
///
602-
/// **Example**:
603-
/// ```json
604-
/// {
605-
/// "strategy": "short_put",
606-
/// "symbol": "SPY",
607-
/// "capital": 100000,
608-
/// "params_grid": {
609-
/// "DELTA_TARGET": [0.20, 0.30, 0.40],
610-
/// "DTE_TARGET": [30, 45, 60]
611-
/// },
612-
/// "n_windows": 5,
613-
/// "objective": "sharpe"
614-
/// }
615-
/// ```
616-
#[tool(name = "walk_forward", annotations(read_only_hint = true))]
617-
async fn walk_forward(
618-
&self,
619-
Parameters(params): Parameters<WalkForwardToolParams>,
620-
) -> SanitizedResult<WalkForwardResponse, String> {
621-
SanitizedResult(
622-
async {
623-
params
624-
.validate()
625-
.map_err(|e| validation_err("walk_forward", e))?;
626-
tools::walk_forward::execute(
627-
&self.cache,
628-
self.adjustment_store.clone(),
629-
&params.strategy,
630-
&params.symbol,
631-
params.capital,
632-
params.params_grid,
633-
params.objective,
634-
Some(params.n_windows),
635-
params.mode,
636-
Some(params.train_pct),
637-
params.start_date,
638-
params.end_date,
639-
params.profile,
640-
)
641-
.await
642-
.map_err(tool_err)
643-
}
644-
.await,
645-
)
646-
}
647-
648591
/// Run a backtest or parameter sweep. Pass a saved strategy by display name.
649592
///
650593
/// Omit `sweep_params` for a single backtest (returns full equity curve, trade log, metrics).
651-
/// Provide `sweep_params` for a grid/bayesian sweep (returns ranked results).
594+
/// Provide `sweep_params` for a grid/bayesian sweep. By default, sweeps run the
595+
/// full analysis pipeline: sweep -> significance gate -> walk-forward ->
596+
/// `oos_data_gate` -> monte carlo. Set `pipeline=false` to return sweep-only results.
652597
/// Results are persisted to the runs database.
653598
///
654599
/// **Example (single backtest)**:
@@ -659,15 +604,16 @@ impl OptopsyServer {
659604
/// }
660605
/// ```
661606
///
662-
/// **Example (parameter sweep)**:
607+
/// **Example (parameter sweep with full pipeline)**:
663608
/// ```json
664609
/// {
665610
/// "strategy": "short_put",
666611
/// "params": { "symbol": "SPY", "CAPITAL": 50000 },
667612
/// "sweep_params": [
668613
/// { "name": "DELTA_TARGET", "start": 0.10, "stop": 0.40, "step": 0.05 },
669614
/// { "name": "DTE_TARGET", "param_type": "int", "start": 30, "stop": 60, "step": 5 }
670-
/// ]
615+
/// ],
616+
/// "pipeline": true
671617
/// }
672618
/// ```
673619
#[tool(name = "backtest", annotations(read_only_hint = false))]
@@ -726,7 +672,7 @@ impl ServerHandler for OptopsyServer {
726672
\n - factor_attribution — decompose returns into factor exposures\
727673
\n - benchmark_analysis — compare vs. benchmark (alpha, beta, capture ratios)\
728674
\n - distribution — P&L or return distribution + normality tests\
729-
\n - walk_forward — walk-forward optimization to validate parameter robustness\
675+
\n - Walk-forward validation runs automatically as part of the backtest pipeline\
730676
\n\
731677
\n### 4. Market Analysis Tools\
732678
\n - aggregate_prices — seasonal/time-bucket return patterns\

src/server/router.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ use axum::Router;
88
use tower_http::cors::CorsLayer;
99

1010
use crate::server::handlers::{
11-
backtests, chat as chat_handlers, forward_tests, profiles, runs, strategies, sweeps, tasks,
11+
backtests, chat as chat_handlers, forward_tests, pipeline, profiles, runs, strategies, sweeps,
12+
tasks,
1213
};
1314
use crate::server::state::AppState;
1415

@@ -119,6 +120,10 @@ pub fn build_api_router(state: AppState) -> Router {
119120
"/walk-forward",
120121
axum::routing::post(crate::server::handlers::walk_forward::run_walk_forward),
121122
)
123+
.route(
124+
"/runs/pipeline",
125+
axum::routing::post(pipeline::create_pipeline),
126+
)
122127
.with_state(state.clone());
123128

124129
let task_routes = Router::new()
@@ -132,6 +137,10 @@ pub fn build_api_router(state: AppState) -> Router {
132137
"/tasks/walk-forward",
133138
axum::routing::post(tasks::submit_walk_forward),
134139
)
140+
.route(
141+
"/tasks/pipeline",
142+
axum::routing::post(tasks::submit_pipeline),
143+
)
135144
.route(
136145
"/tasks/{id}",
137146
axum::routing::get(tasks::get_task).delete(tasks::cancel_task),

src/server/task_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub enum TaskKind {
1919
Single,
2020
Sweep,
2121
WalkForward,
22+
Pipeline,
2223
}
2324

2425
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]

0 commit comments

Comments
 (0)