Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ pub const DEFAULT_ANALYSIS_YEARS: u32 = 5;
/// Maximum finite value for profit factor when there are no losing trades.
/// Avoids `f64::INFINITY` which is not valid JSON.
pub const MAX_PROFIT_FACTOR: f64 = 999.99;

/// Minimum number of return observations required for meaningful
/// block-bootstrap Monte Carlo simulation.
pub const MIN_RETURNS_FOR_BOOTSTRAP: usize = 30;
1 change: 1 addition & 0 deletions src/server/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
pub mod backtests;
pub mod chat;
pub mod forward_tests;
pub mod pipeline;
pub mod profiles;
pub mod run_script;
pub mod runs;
Expand Down
99 changes: 99 additions & 0 deletions src/server/handlers/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! REST API handler for the backtest pipeline.
//!
//! Runs the full analysis pipeline
//! (`sweep` -> `significance_gate` -> `walk-forward` -> `oos_data_gate` -> `monte carlo`)
//! and returns a `PipelineResponse` with stage statuses.
//! Monte Carlo may be skipped when earlier gates do not pass.

use axum::extract::State;
use axum::http::StatusCode;
use axum::Json;
use garde::Validate;
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;

use crate::server::handlers::sweeps::SweepParamDef;
use crate::server::state::AppState;
use crate::tools::backtest::BacktestToolParams;
use crate::tools::response_types::pipeline::PipelineResponse;

fn default_mode() -> String {
"grid".to_string()
}

fn default_objective() -> String {
"sharpe".to_string()
}

fn default_max_evaluations() -> usize {
50
}

/// Request body for `POST /runs/pipeline`.
#[derive(Debug, Deserialize)]
pub struct CreatePipelineRequest {
pub strategy: String,
#[serde(default = "default_mode")]
pub mode: String,
#[serde(default = "default_objective")]
pub objective: String,
pub params: HashMap<String, Value>,
pub sweep_params: Vec<SweepParamDef>,
#[serde(default = "default_max_evaluations")]
pub max_evaluations: usize,
#[serde(default)]
pub num_permutations: usize,
#[serde(default)]
pub thread_id: Option<String>,
}

pub(super) fn build_pipeline_params(
req: CreatePipelineRequest,
) -> Result<BacktestToolParams, (StatusCode, String)> {
if req.sweep_params.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
"sweep_params must be non-empty for pipeline execution".to_string(),
));
}

let params = BacktestToolParams {
strategy: req.strategy,
mode: req.mode,
objective: req.objective,
params: req.params,
sweep_params: req.sweep_params,
max_evaluations: req.max_evaluations,
num_permutations: req.num_permutations,
thread_id: req.thread_id,
pipeline: true,
};
Comment thread
michaelchu marked this conversation as resolved.

params
.validate()
.map_err(|e| (StatusCode::BAD_REQUEST, format!("Validation error: {e}")))?;

Ok(params)
}

/// `POST /runs/pipeline` — run the full pipeline synchronously and return the result.
pub async fn create_pipeline(
State(state): State<AppState>,
Json(req): Json<CreatePipelineRequest>,
) -> Result<Json<PipelineResponse>, (StatusCode, String)> {
let params = build_pipeline_params(req)?;

let result = crate::tools::backtest::execute(&state.server, params)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Comment thread
michaelchu marked this conversation as resolved.
Comment thread
michaelchu marked this conversation as resolved.

// The pipeline path always returns BacktestToolResponse::Pipeline
match result {
crate::tools::backtest::BacktestToolResponse::Pipeline(response) => Ok(Json(*response)),
_ => Err((
StatusCode::INTERNAL_SERVER_ERROR,
"Pipeline mode did not return a pipeline response".to_string(),
)),
}
}
79 changes: 79 additions & 0 deletions src/server/handlers/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,3 +909,82 @@ pub async fn stream_task(
.keep_alive(KeepAlive::default()),
)
}

// ──────────────────────────────────────────────────────────────────────────────
// Pipeline task
// ──────────────────────────────────────────────────────────────────────────────

/// `POST /tasks/pipeline` — submit a full pipeline (sweep + walk-forward + monte carlo) as a background task.
pub async fn submit_pipeline(
State(state): State<AppState>,
Json(req): Json<super::pipeline::CreatePipelineRequest>,
) -> Result<Json<SubmitResponse>, (StatusCode, String)> {
let params = super::pipeline::build_pipeline_params(req)?;

let symbol = params
.params
.get("symbol")
.and_then(Value::as_str)
.unwrap_or("pending")
.to_owned();

let params_json =
serde_json::to_value(&params.params).unwrap_or(Value::Object(serde_json::Map::default()));

let task = state.task_manager.register(
TaskKind::Pipeline,
&params.strategy,
&symbol,
params.thread_id.clone(),
params_json,
);
Comment thread
michaelchu marked this conversation as resolved.
Comment thread
michaelchu marked this conversation as resolved.
let task_id = task.id.clone();

let tm = Arc::clone(&state.task_manager);
let server = state.server.clone();
tokio::spawn(async move {
// Wait for permit (or cancellation)
let permit = tokio::select! {
p = tm.acquire_permit() => p,
() = task.cancellation_token.cancelled() => {
tm.mark_cancelled(&task.id);
return;
}
};

if task.cancellation_token.is_cancelled() {
tm.mark_cancelled(&task.id);
drop(permit);
return;
}

tm.mark_running(&task.id);

let result = crate::tools::backtest::execute(&server, params).await;

drop(permit);

if task.cancellation_token.is_cancelled() {
tm.mark_cancelled(&task.id);
return;
}

match result {
Ok(crate::tools::backtest::BacktestToolResponse::Pipeline(response)) => {
let result_json = serde_json::to_value(&*response).unwrap_or(Value::Null);
tm.mark_completed(&task.id, result_json, response.sweep_id.clone());
}
Ok(_) => {
tm.mark_failed(
&task.id,
"Pipeline mode did not return a pipeline response".to_string(),
);
}
Err(e) => {
tm.mark_failed(&task.id, e.to_string());
}
}
});

Ok(Json(SubmitResponse { task_id }))
}
2 changes: 2 additions & 0 deletions src/server/handlers/walk_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub async fn run_walk_forward(
params.start_date,
params.end_date,
params.profile,
None,
None,
)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Expand Down
70 changes: 8 additions & 62 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ use crate::tools::response_types::{
AggregatePricesResponse, BenchmarkAnalysisResponse, CointegrationResponse, CorrelateResponse,
DistributionResponse, DrawdownAnalysisResponse, FactorAttributionResponse, HypothesisParams,
HypothesisResponse, MonteCarloResponse, PortfolioOptimizeResponse, RegimeDetectResponse,
RollingMetricResponse, WalkForwardResponse,
RollingMetricResponse,
};
use params::{
tool_err, validation_err, AggregatePricesParams, BenchmarkAnalysisParams, CointegrationParams,
CorrelateParams, DistributionParams, DrawdownAnalysisParams, FactorAttributionParams,
MonteCarloParams, PortfolioOptimizeParams, RegimeDetectParams, RollingMetricParams,
WalkForwardToolParams,
};
use sanitize::SanitizedResult;

Expand Down Expand Up @@ -589,66 +588,12 @@ impl OptopsyServer {
.map_err(|e| format!("Failed to read scripting reference: {e}"))
}

/// Run walk-forward optimization: split data into train/test windows, optimize parameters
/// on each training window, and validate on the out-of-sample test window.
///
/// **When to use**: After finding a profitable strategy via backtest + sweep, to verify
/// that optimized parameters generalize to unseen data. The efficiency ratio (OOS/IS metric)
/// measures how well the strategy avoids overfitting.
///
/// **Output**: Per-window IS/OOS metrics, best params per window, stitched OOS equity curve,
/// and efficiency ratio.
///
/// **Example**:
/// ```json
/// {
/// "strategy": "short_put",
/// "symbol": "SPY",
/// "capital": 100000,
/// "params_grid": {
/// "DELTA_TARGET": [0.20, 0.30, 0.40],
/// "DTE_TARGET": [30, 45, 60]
/// },
/// "n_windows": 5,
/// "objective": "sharpe"
/// }
/// ```
#[tool(name = "walk_forward", annotations(read_only_hint = true))]
async fn walk_forward(
&self,
Parameters(params): Parameters<WalkForwardToolParams>,
) -> SanitizedResult<WalkForwardResponse, String> {
SanitizedResult(
async {
params
.validate()
.map_err(|e| validation_err("walk_forward", e))?;
tools::walk_forward::execute(
&self.cache,
self.adjustment_store.clone(),
&params.strategy,
&params.symbol,
params.capital,
params.params_grid,
params.objective,
Some(params.n_windows),
params.mode,
Some(params.train_pct),
params.start_date,
params.end_date,
params.profile,
)
.await
.map_err(tool_err)
}
.await,
)
}

/// Run a backtest or parameter sweep. Pass a saved strategy by display name.
///
/// Omit `sweep_params` for a single backtest (returns full equity curve, trade log, metrics).
/// Provide `sweep_params` for a grid/bayesian sweep (returns ranked results).
/// Provide `sweep_params` for a grid/bayesian sweep. By default, sweeps run the
/// full analysis pipeline: sweep -> significance gate -> walk-forward ->
/// `oos_data_gate` -> monte carlo. Set `pipeline=false` to return sweep-only results.
/// Results are persisted to the runs database.
///
/// **Example (single backtest)**:
Expand All @@ -659,15 +604,16 @@ impl OptopsyServer {
/// }
/// ```
///
/// **Example (parameter sweep)**:
/// **Example (parameter sweep with full pipeline)**:
/// ```json
/// {
/// "strategy": "short_put",
/// "params": { "symbol": "SPY", "CAPITAL": 50000 },
/// "sweep_params": [
/// { "name": "DELTA_TARGET", "start": 0.10, "stop": 0.40, "step": 0.05 },
/// { "name": "DTE_TARGET", "param_type": "int", "start": 30, "stop": 60, "step": 5 }
/// ]
/// ],
/// "pipeline": true
/// }
/// ```
#[tool(name = "backtest", annotations(read_only_hint = false))]
Expand Down Expand Up @@ -726,7 +672,7 @@ impl ServerHandler for OptopsyServer {
\n - factor_attribution — decompose returns into factor exposures\
\n - benchmark_analysis — compare vs. benchmark (alpha, beta, capture ratios)\
\n - distribution — P&L or return distribution + normality tests\
\n - walk_forward — walk-forward optimization to validate parameter robustness\
\n - Walk-forward validation runs automatically as part of the backtest pipeline\
\n\
\n### 4. Market Analysis Tools\
\n - aggregate_prices — seasonal/time-bucket return patterns\
Expand Down
11 changes: 10 additions & 1 deletion src/server/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use axum::Router;
use tower_http::cors::CorsLayer;

use crate::server::handlers::{
backtests, chat as chat_handlers, forward_tests, profiles, runs, strategies, sweeps, tasks,
backtests, chat as chat_handlers, forward_tests, pipeline, profiles, runs, strategies, sweeps,
tasks,
};
use crate::server::state::AppState;

Expand Down Expand Up @@ -119,6 +120,10 @@ pub fn build_api_router(state: AppState) -> Router {
"/walk-forward",
axum::routing::post(crate::server::handlers::walk_forward::run_walk_forward),
)
.route(
"/runs/pipeline",
axum::routing::post(pipeline::create_pipeline),
)
.with_state(state.clone());

let task_routes = Router::new()
Expand All @@ -132,6 +137,10 @@ pub fn build_api_router(state: AppState) -> Router {
"/tasks/walk-forward",
axum::routing::post(tasks::submit_walk_forward),
)
.route(
"/tasks/pipeline",
axum::routing::post(tasks::submit_pipeline),
)
.route(
"/tasks/{id}",
axum::routing::get(tasks::get_task).delete(tasks::cancel_task),
Expand Down
1 change: 1 addition & 0 deletions src/server/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub enum TaskKind {
Single,
Sweep,
WalkForward,
Pipeline,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
Expand Down
Loading
Loading