Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ pub const DEFAULT_ANALYSIS_YEARS: u32 = 5;
/// Maximum finite value for profit factor when there are no losing trades.
/// Avoids `f64::INFINITY` which is not valid JSON.
pub const MAX_PROFIT_FACTOR: f64 = 999.99;

/// Minimum number of return observations required for meaningful
/// block-bootstrap Monte Carlo simulation.
pub const MIN_RETURNS_FOR_BOOTSTRAP: usize = 30;
1 change: 1 addition & 0 deletions src/server/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod backtests;
pub mod chat;
pub mod pipeline;
pub mod profiles;
pub mod run_script;
pub mod runs;
Expand Down
84 changes: 84 additions & 0 deletions src/server/handlers/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//! REST API handler for the backtest pipeline.
//!
//! Runs the full analysis pipeline (sweep -> walk-forward -> monte carlo)
//! and returns a `PipelineResponse` with stage statuses.
Comment thread
michaelchu marked this conversation as resolved.
Outdated

use axum::extract::State;
use axum::http::StatusCode;
use axum::Json;
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;

use crate::server::handlers::sweeps::SweepParamDef;
use crate::server::state::AppState;
use crate::tools::backtest::BacktestToolParams;
use crate::tools::response_types::pipeline::PipelineResponse;

fn default_mode() -> String {
"grid".to_string()
}

fn default_objective() -> String {
"sharpe".to_string()
}

fn default_max_evaluations() -> usize {
50
}

/// Request body for `POST /runs/pipeline`.
#[derive(Debug, Deserialize)]
pub struct CreatePipelineRequest {
pub strategy: String,
#[serde(default = "default_mode")]
pub mode: String,
#[serde(default = "default_objective")]
pub objective: String,
pub params: HashMap<String, Value>,
pub sweep_params: Vec<SweepParamDef>,
#[serde(default = "default_max_evaluations")]
pub max_evaluations: usize,
#[serde(default)]
pub num_permutations: usize,
#[serde(default)]
pub thread_id: Option<String>,
}

/// `POST /runs/pipeline` — run the full pipeline synchronously and return the result.
pub async fn create_pipeline(
State(state): State<AppState>,
Json(req): Json<CreatePipelineRequest>,
) -> Result<Json<PipelineResponse>, (StatusCode, String)> {
if req.sweep_params.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
"sweep_params must be non-empty for pipeline execution".to_string(),
));
}

let params = BacktestToolParams {
strategy: req.strategy,
mode: req.mode,
objective: req.objective,
params: req.params,
sweep_params: req.sweep_params,
max_evaluations: req.max_evaluations,
num_permutations: req.num_permutations,
thread_id: req.thread_id,
pipeline: true,
};
Comment thread
michaelchu marked this conversation as resolved.

let result = crate::tools::backtest::execute(&state.server, params)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Comment thread
michaelchu marked this conversation as resolved.
Comment thread
michaelchu marked this conversation as resolved.

// The pipeline path always returns BacktestToolResponse::Pipeline
match result {
crate::tools::backtest::BacktestToolResponse::Pipeline(response) => Ok(Json(*response)),
_ => Err((
StatusCode::INTERNAL_SERVER_ERROR,
"Pipeline mode did not return a pipeline response".to_string(),
)),
}
}
96 changes: 96 additions & 0 deletions src/server/handlers/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,3 +909,99 @@ pub async fn stream_task(
.keep_alive(KeepAlive::default()),
)
}

// ──────────────────────────────────────────────────────────────────────────────
// Pipeline task
// ──────────────────────────────────────────────────────────────────────────────

/// `POST /tasks/pipeline` — submit a full pipeline (sweep + walk-forward + monte carlo) as a background task.
pub async fn submit_pipeline(
State(state): State<AppState>,
Json(req): Json<super::pipeline::CreatePipelineRequest>,
) -> Result<Json<SubmitResponse>, (StatusCode, String)> {
if req.sweep_params.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
"sweep_params must be non-empty for pipeline execution".to_string(),
));
}

let symbol = req
.params
.get("symbol")
.and_then(Value::as_str)
.unwrap_or("pending")
.to_owned();

let params_json =
serde_json::to_value(&req.params).unwrap_or(Value::Object(serde_json::Map::default()));

let task = state.task_manager.register(
TaskKind::Pipeline,
&req.strategy,
&symbol,
req.thread_id.clone(),
params_json,
);
Comment thread
michaelchu marked this conversation as resolved.
Comment thread
michaelchu marked this conversation as resolved.
let task_id = task.id.clone();

let tm = Arc::clone(&state.task_manager);
let server = state.server.clone();
tokio::spawn(async move {
// Wait for permit (or cancellation)
let permit = tokio::select! {
p = tm.acquire_permit() => p,
() = task.cancellation_token.cancelled() => {
tm.mark_cancelled(&task.id);
return;
}
};

if task.cancellation_token.is_cancelled() {
tm.mark_cancelled(&task.id);
drop(permit);
return;
}

tm.mark_running(&task.id);

let params = crate::tools::backtest::BacktestToolParams {
strategy: req.strategy,
mode: req.mode,
objective: req.objective,
params: req.params,
sweep_params: req.sweep_params,
max_evaluations: req.max_evaluations,
num_permutations: req.num_permutations,
thread_id: req.thread_id,
pipeline: true,
};
Comment thread
michaelchu marked this conversation as resolved.
Outdated

let result = crate::tools::backtest::execute(&server, params).await;

drop(permit);

if task.cancellation_token.is_cancelled() {
tm.mark_cancelled(&task.id);
return;
}

match result {
Ok(crate::tools::backtest::BacktestToolResponse::Pipeline(response)) => {
let result_json = serde_json::to_value(&*response).unwrap_or(Value::Null);
tm.mark_completed(&task.id, result_json, response.sweep_id.clone());
}
Ok(_) => {
tm.mark_failed(
&task.id,
"Pipeline mode did not return a pipeline response".to_string(),
);
}
Err(e) => {
tm.mark_failed(&task.id, e.to_string());
}
}
});

Ok(Json(SubmitResponse { task_id }))
}
2 changes: 2 additions & 0 deletions src/server/handlers/walk_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub async fn run_walk_forward(
params.start_date,
params.end_date,
params.profile,
None,
None,
)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Expand Down
70 changes: 8 additions & 62 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ use crate::tools::response_types::{
AggregatePricesResponse, BenchmarkAnalysisResponse, CointegrationResponse, CorrelateResponse,
DistributionResponse, DrawdownAnalysisResponse, FactorAttributionResponse, HypothesisParams,
HypothesisResponse, MonteCarloResponse, PortfolioOptimizeResponse, RegimeDetectResponse,
RollingMetricResponse, WalkForwardResponse,
RollingMetricResponse,
};
use params::{
tool_err, validation_err, AggregatePricesParams, BenchmarkAnalysisParams, CointegrationParams,
CorrelateParams, DistributionParams, DrawdownAnalysisParams, FactorAttributionParams,
MonteCarloParams, PortfolioOptimizeParams, RegimeDetectParams, RollingMetricParams,
WalkForwardToolParams,
};
use sanitize::SanitizedResult;

Expand Down Expand Up @@ -573,66 +572,12 @@ impl OptopsyServer {
.map_err(|e| format!("Failed to read scripting reference: {e}"))
}

/// Run walk-forward optimization: split data into train/test windows, optimize parameters
/// on each training window, and validate on the out-of-sample test window.
///
/// **When to use**: After finding a profitable strategy via backtest + sweep, to verify
/// that optimized parameters generalize to unseen data. The efficiency ratio (OOS/IS metric)
/// measures how well the strategy avoids overfitting.
///
/// **Output**: Per-window IS/OOS metrics, best params per window, stitched OOS equity curve,
/// and efficiency ratio.
///
/// **Example**:
/// ```json
/// {
/// "strategy": "short_put",
/// "symbol": "SPY",
/// "capital": 100000,
/// "params_grid": {
/// "DELTA_TARGET": [0.20, 0.30, 0.40],
/// "DTE_TARGET": [30, 45, 60]
/// },
/// "n_windows": 5,
/// "objective": "sharpe"
/// }
/// ```
#[tool(name = "walk_forward", annotations(read_only_hint = true))]
async fn walk_forward(
&self,
Parameters(params): Parameters<WalkForwardToolParams>,
) -> SanitizedResult<WalkForwardResponse, String> {
SanitizedResult(
async {
params
.validate()
.map_err(|e| validation_err("walk_forward", e))?;
tools::walk_forward::execute(
&self.cache,
self.adjustment_store.clone(),
&params.strategy,
&params.symbol,
params.capital,
params.params_grid,
params.objective,
Some(params.n_windows),
params.mode,
Some(params.train_pct),
params.start_date,
params.end_date,
params.profile,
)
.await
.map_err(tool_err)
}
.await,
)
}

/// Run a backtest or parameter sweep. Pass a saved strategy by display name.
///
/// Omit `sweep_params` for a single backtest (returns full equity curve, trade log, metrics).
/// Provide `sweep_params` for a grid/bayesian sweep (returns ranked results).
/// Provide `sweep_params` for a grid/bayesian sweep. Set `pipeline=true` to run the
/// full analysis pipeline: sweep -> significance gate -> walk-forward ->
/// `oos_data_gate` -> monte carlo. Default is sweep only.
/// Results are persisted to the runs database.
///
/// **Example (single backtest)**:
Expand All @@ -643,15 +588,16 @@ impl OptopsyServer {
/// }
/// ```
///
/// **Example (parameter sweep)**:
/// **Example (parameter sweep with full pipeline)**:
/// ```json
/// {
/// "strategy": "short_put",
/// "params": { "symbol": "SPY", "CAPITAL": 50000 },
/// "sweep_params": [
/// { "name": "DELTA_TARGET", "start": 0.10, "stop": 0.40, "step": 0.05 },
/// { "name": "DTE_TARGET", "param_type": "int", "start": 30, "stop": 60, "step": 5 }
/// ]
/// ],
/// "pipeline": true
/// }
/// ```
#[tool(name = "backtest", annotations(read_only_hint = false))]
Expand Down Expand Up @@ -710,7 +656,7 @@ impl ServerHandler for OptopsyServer {
\n - factor_attribution — decompose returns into factor exposures\
\n - benchmark_analysis — compare vs. benchmark (alpha, beta, capture ratios)\
\n - distribution — P&L or return distribution + normality tests\
\n - walk_forward — walk-forward optimization to validate parameter robustness\
\n - Walk-forward validation runs automatically as part of the backtest pipeline\
\n\
\n### 4. Market Analysis Tools\
\n - aggregate_prices — seasonal/time-bucket return patterns\
Expand Down
10 changes: 9 additions & 1 deletion src/server/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use axum::Router;
use tower_http::cors::CorsLayer;

use crate::server::handlers::{
backtests, chat as chat_handlers, profiles, runs, strategies, sweeps, tasks,
backtests, chat as chat_handlers, pipeline, profiles, runs, strategies, sweeps, tasks,
};
use crate::server::state::AppState;

Expand Down Expand Up @@ -119,6 +119,10 @@ pub fn build_api_router(state: AppState) -> Router {
"/walk-forward",
axum::routing::post(crate::server::handlers::walk_forward::run_walk_forward),
)
.route(
"/runs/pipeline",
axum::routing::post(pipeline::create_pipeline),
)
.with_state(state.clone());

let task_routes = Router::new()
Expand All @@ -132,6 +136,10 @@ pub fn build_api_router(state: AppState) -> Router {
"/tasks/walk-forward",
axum::routing::post(tasks::submit_walk_forward),
)
.route(
"/tasks/pipeline",
axum::routing::post(tasks::submit_pipeline),
)
.route(
"/tasks/{id}",
axum::routing::get(tasks::get_task).delete(tasks::cancel_task),
Expand Down
1 change: 1 addition & 0 deletions src/server/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub enum TaskKind {
Single,
Sweep,
WalkForward,
Pipeline,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
Expand Down
Loading