diff --git a/pyomo/contrib/parmest/examples/reactor_design/multistart_example.py b/pyomo/contrib/parmest/examples/reactor_design/multistart_example.py new file mode 100644 index 00000000000..cfb9eea9798 --- /dev/null +++ b/pyomo/contrib/parmest/examples/reactor_design/multistart_example.py @@ -0,0 +1,56 @@ +# ____________________________________________________________________________________ +# +# Pyomo: Python Optimization Modeling Objects +# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC +# Under the terms of Contract DE-NA0003525 with National Technology and Engineering +# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this +# software. This software is distributed under the 3-clause BSD License. +# ____________________________________________________________________________________ + +from pyomo.common.dependencies import numpy as np, pandas as pd +from itertools import product +from os.path import join, abspath, dirname +import pyomo.contrib.parmest.parmest as parmest +from pyomo.contrib.parmest.examples.reactor_design.reactor_design import ( + ReactorDesignExperiment, +) + + +def main(): + + # Read in data + file_dirname = dirname(abspath(str(__file__))) + file_name = abspath(join(file_dirname, "reactor_data.csv")) + data = pd.read_csv(file_name) + + # Create an experiment list + exp_list = [ReactorDesignExperiment(data, i) for i in range(data.shape[0])] + + # Solver options belong here (Ipopt options shown as example) + solver_options = {"max_iter": 1000, "tol": 1e-6} + + pest = parmest.Estimator( + exp_list, obj_function="SSE", solver_options=solver_options + ) + + # Single-start estimation + obj, theta = pest.theta_est() + print("Single-start objective:", obj) + print("Single-start theta:\n", theta) + + # Multistart estimation + results_df, best_theta, best_obj = pest.theta_est_multistart( + n_restarts=10, + multistart_sampling_method="uniform_random", + seed=42, + save_results=False, # True if you want CSV via file_name= + ) + + print("\nMultistart best objective:", best_obj) + print("Multistart best theta:", best_theta) + print("\nAll multistart results:") + print(results_df) + + +if __name__ == "__main__": + main() diff --git a/pyomo/contrib/parmest/examples/reactor_design/reactor_design.py b/pyomo/contrib/parmest/examples/reactor_design/reactor_design.py index 6172096e1ad..6a065d20a63 100644 --- a/pyomo/contrib/parmest/examples/reactor_design/reactor_design.py +++ b/pyomo/contrib/parmest/examples/reactor_design/reactor_design.py @@ -23,10 +23,14 @@ def reactor_design_model(): model = pyo.ConcreteModel() # Rate constants, make unknown parameters variables - model.k1 = pyo.Var(initialize=5.0 / 6.0, within=pyo.PositiveReals) # min^-1 - model.k2 = pyo.Var(initialize=5.0 / 3.0, within=pyo.PositiveReals) # min^-1 + model.k1 = pyo.Var( + initialize=5.0 / 6.0, within=pyo.PositiveReals, bounds=(0.1, 10.0) + ) # min^-1 + model.k2 = pyo.Var( + initialize=5.0 / 3.0, within=pyo.PositiveReals, bounds=(0.1, 10.0) + ) # min^-1 model.k3 = pyo.Var( - initialize=1.0 / 6000.0, within=pyo.PositiveReals + initialize=1.0 / 6000.0, within=pyo.PositiveReals, bounds=(1e-5, 1e-3) ) # m^3/(gmol min) # Inlet concentration of A, gmol/m^3 diff --git a/pyomo/contrib/parmest/examples/rooney_biegler/multistart_example.py b/pyomo/contrib/parmest/examples/rooney_biegler/multistart_example.py new file mode 100644 index 00000000000..7ff8f55ad4b --- /dev/null +++ b/pyomo/contrib/parmest/examples/rooney_biegler/multistart_example.py @@ -0,0 +1,62 @@ +# ____________________________________________________________________________________ +# +# Pyomo: Python Optimization Modeling Objects +# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC +# Under the terms of Contract DE-NA0003525 with National Technology and Engineering +# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this +# software. This software is distributed under the 3-clause BSD License. +# ____________________________________________________________________________________ + +from pyomo.common.dependencies import numpy as np, pandas as pd +from itertools import product +import pyomo.contrib.parmest.parmest as parmest +from pyomo.contrib.parmest.examples.rooney_biegler.rooney_biegler import ( + RooneyBieglerExperiment, +) + + +def main(): + + # Data + data = pd.DataFrame( + data=[[1, 8.3], [2, 10.3], [3, 19.0], [4, 16.0], [5, 15.6], [7, 19.8]], + columns=['hour', 'y'], + ) + + # Create an experiment list + exp_list = [] + for i in range(data.shape[0]): + exp_list.append(RooneyBieglerExperiment(data.loc[i, :])) + + # View one model + # exp0_model = exp_list[0].get_labeled_model() + # exp0_model.pprint() + + # Solver options belong here (Ipopt options shown as example) + solver_options = {"max_iter": 1000, "tol": 1e-6} + + pest = parmest.Estimator( + exp_list, obj_function="SSE", solver_options=solver_options + ) + + # Single-start estimation + obj, theta = pest.theta_est() + print("Single-start objective:", obj) + print("Single-start theta:\n", theta) + + # Multistart estimation + results_df, best_theta, best_obj = pest.theta_est_multistart( + n_restarts=10, + multistart_sampling_method="uniform_random", + seed=42, + save_results=False, # True if you want CSV via file_name= + ) + + print("\nMultistart best objective:", best_obj) + print("Multistart best theta:", best_theta) + print("\nAll multistart results:") + print(results_df) + + +if __name__ == "__main__": + main() diff --git a/pyomo/contrib/parmest/examples/rooney_biegler/rooney_biegler.py b/pyomo/contrib/parmest/examples/rooney_biegler/rooney_biegler.py index bdb494bca03..4ed4e8fc947 100644 --- a/pyomo/contrib/parmest/examples/rooney_biegler/rooney_biegler.py +++ b/pyomo/contrib/parmest/examples/rooney_biegler/rooney_biegler.py @@ -43,8 +43,8 @@ def rooney_biegler_model(data, theta=None): if theta is None: theta = {'asymptote': 15, 'rate_constant': 0.5} - model.asymptote = pyo.Var(initialize=theta['asymptote']) - model.rate_constant = pyo.Var(initialize=theta['rate_constant']) + model.asymptote = pyo.Var(initialize=theta['asymptote'], bounds=(0.1, 100)) + model.rate_constant = pyo.Var(initialize=theta['rate_constant'], bounds=(0, 10)) # Fix the unknown parameters model.asymptote.fix() diff --git a/pyomo/contrib/parmest/parmest.py b/pyomo/contrib/parmest/parmest.py index 58c2b6d0a2f..df5a459e974 100644 --- a/pyomo/contrib/parmest/parmest.py +++ b/pyomo/contrib/parmest/parmest.py @@ -904,7 +904,32 @@ def _expanded_theta_info(self, model): def _create_parmest_model(self, experiment_number): """ - Modify the Pyomo model for parameter estimation + Build a parmest-ready model for a single experiment. + + This helper retrieves the labeled experiment model, prepares objective + components needed by parmest, and converts unknown parameters to + decision variables. The returned model is the one used to populate EF + scenario blocks. + + Parameters + ---------- + experiment_number : int + Index into ``self.exp_list`` selecting which experiment model to + load. + + Returns + ------- + ConcreteModel + A model configured for parmest optimization, including: + 1. a ``Total_Cost_Objective`` (if ``self.obj_function`` is set) + 2. converted unknown-parameter variables (unfixed) + + Notes + ----- + - Existing user objectives are deactivated before parmest objective + components are attached. + - Reserved component names are checked to avoid overriding user model + components. """ model = _get_labeled_model(self.exp_list[experiment_number]) @@ -962,21 +987,40 @@ def _instance_creation_callback(self, experiment_number=None, cb_data=None): model = self._create_parmest_model(experiment_number) return model - def _create_scenario_blocks(self, bootlist=None, theta_vals=None, fix_theta=False): + def _create_scenario_blocks( + self, bootlist=None, theta_vals=None, fix_theta=False, multistart=False + ): """ + Build the block-based extensive form (EF) model for estimation. + + The EF includes: + 1. a master theta variable container (``model.parmest_theta``), + 2. one child block per selected experiment (``model.exp_scenarios``), + 3. optional theta-linking constraints between master and child blocks, + 4. a single aggregate objective over all child blocks. + + In multistart mode, this method also refreshes experiment-level cached + model state before rebuilding each scenario so per-start initializations + are applied to the model that is actually solved. + Create scenario blocks for parameter estimation. Parameters ---------- bootlist : list, optional - List of bootstrap experiment numbers to use. If None, use all experiments in exp_list. - Default is None. + Experiment indices to include. If ``None``, all experiments in + ``self.exp_list`` are used. theta_vals : dict, optional - Dictionary of theta values to set in the model. If None, use default values from experiment class. - Default is None. + Theta values to apply as initial values to parent and child theta + variables. When ``multistart=True``, these values are also pushed to + experiment ``theta_initial`` (if present) before rebuilding. fix_theta : bool, optional - If True, fix the theta values in the model. If False, leave them free. - Default is False. + If ``True``, theta variables are fixed in each scenario and no + linking constraints are created. + multistart : bool, optional + If ``True``, force experiment model refresh between starts to avoid + stale cached model reuse. + Returns ------- model : ConcreteModel @@ -984,6 +1028,12 @@ def _create_scenario_blocks(self, bootlist=None, theta_vals=None, fix_theta=Fals each experiment in exp_list or bootlist. """ model = pyo.ConcreteModel() + # if multistart: + # template_experiment = self.exp_list[0] + # if theta_vals is not None and hasattr(template_experiment, "theta_initial"): + # template_experiment.theta_initial = dict(theta_vals) + # if hasattr(template_experiment, "model"): + # template_experiment.model = None template_model = self._create_parmest_model(0) expanded_theta_names, expanded_theta_cuids = self._expanded_theta_info( @@ -1023,6 +1073,15 @@ def _create_scenario_blocks(self, bootlist=None, theta_vals=None, fix_theta=Fals bootlist if bootlist is not None else list(range(len(self.exp_list))) ) + # # Create indexed block for holding scenario models + # model.exp_scenarios = pyo.Block(range(self.obj_probability_constant)) + # for i, experiment_number in enumerate(scenario_numbers): + # if multistart: + # experiment = self.exp_list[experiment_number] + # if theta_vals is not None and hasattr(experiment, "theta_initial"): + # experiment.theta_initial = dict(theta_vals) + # if hasattr(experiment, "model"): + # experiment.model = None # get the probability constant that is applied to the objective function # parmest solves the estimation problem by applying equal probabilities to # the objective function of all the scenarios from the experiment list @@ -1095,6 +1154,188 @@ def total_obj(m): self.ef_instance = model return model + def _generate_initial_theta( + self, + seed=None, + n_restarts=None, + multistart_sampling_method=None, + user_provided_df=None, + experiment_number=0, + ): + """ + Create the canonical multistart initialization/results DataFrame. + + Output schema is: + 1. theta columns (canonical order, quote-normalized names), + 2. ``converged_`` columns, + 3. ``final objective``, ``solver termination``, ``solve_time``. + + Initial theta rows are either sampled from bounds or taken from a + user-provided DataFrame. + + Parameters + ---------- + seed : int, optional + Random seed used by stochastic samplers. + n_restarts : int, optional + Number of starts to generate for sampled methods. Ignored when + ``user_provided_df`` is provided. + multistart_sampling_method : str, optional + Sampling method. Supported values: + ``uniform_random``, ``latin_hypercube``, ``sobol_sampling``. + user_provided_df : DataFrame, optional + Explicit initialization table. Must contain exactly the theta + columns (order may vary). Values must be finite and within bounds. + experiment_number : int, optional + Experiment index used to discover canonical theta names and bounds. + + Returns + ------- + DataFrame + Canonical initialization/results table ready for multistart solve + bookkeeping. + + Raises + ------ + ValueError + For missing/invalid bounds, invalid sampling method, malformed + user-provided starts, non-finite values, or out-of-bound starts. + TypeError + For invalid input types (for example, non-DataFrame + ``user_provided_df`` or non-integer ``n_restarts`` when required). + RuntimeError + If expected theta components cannot be located on the model. + """ + parmest_model = self._create_parmest_model(experiment_number) + + raw_theta_names = self._expand_indexed_unknowns(parmest_model) + theta_names = [n.replace("'", "") for n in raw_theta_names] + if len(theta_names) != len(set(theta_names)): + raise ValueError(f"Duplicate theta names are not allowed: {theta_names}") + + theta_vars = [parmest_model.find_component(name) for name in raw_theta_names] + if any(v is None for v in theta_vars): + raise RuntimeError( + "Failed to locate one or more theta components on model." + ) + + lower_bound = np.array([v.lb for v in theta_vars], dtype=float) + upper_bound = np.array([v.ub for v in theta_vars], dtype=float) + + if np.any(np.isnan(lower_bound)) or np.any(np.isnan(upper_bound)): + raise ValueError( + "The lower and upper bounds for the theta values must be defined." + ) + if np.any(lower_bound > upper_bound): + raise ValueError( + "Each lower bound must be less than or equal to the corresponding upper bound." + ) + + if user_provided_df is not None: + if not isinstance(user_provided_df, pd.DataFrame): + raise TypeError("user_provided_df must be a pandas DataFrame.") + if user_provided_df.shape[1] != len(theta_names): + raise ValueError( + "user_provided_df must have exactly one column per theta name." + ) + clean_cols = [str(c).replace("'", "") for c in user_provided_df.columns] + if len(clean_cols) != len(set(clean_cols)): + raise ValueError("Duplicate theta columns are not allowed.") + if set(clean_cols) != set(theta_names): + raise ValueError( + f"Provided columns {clean_cols} do not match expected theta names {theta_names}." + ) + df_multistart = user_provided_df.copy() + df_multistart.columns = clean_cols + df_multistart = df_multistart.reindex(columns=theta_names) + if df_multistart.shape[0] == 0: + raise ValueError("user_provided_df must contain at least one row.") + if n_restarts is not None and n_restarts != df_multistart.shape[0]: + raise ValueError( + "n_restarts must match the number of rows in user_provided_df." + ) + theta_vals_multistart = df_multistart.to_numpy(dtype=float) + n_restarts = df_multistart.shape[0] + elif multistart_sampling_method == "uniform_random": + if not isinstance(n_restarts, int): + raise TypeError("n_restarts must be an integer.") + if n_restarts <= 0: + raise ValueError("n_restarts must be greater than zero.") + # Use a local RNG to avoid mutating global random state. + rng = np.random.default_rng(seed) + theta_vals_multistart = rng.uniform( + low=lower_bound, high=upper_bound, size=(n_restarts, len(theta_names)) + ) + + elif multistart_sampling_method == "latin_hypercube": + if not isinstance(n_restarts, int): + raise TypeError("n_restarts must be an integer.") + if n_restarts <= 0: + raise ValueError("n_restarts must be greater than zero.") + # Generate theta values using Latin hypercube sampling or Sobol sampling + # Generate theta values using Latin hypercube sampling + # Create a Latin Hypercube sampler that uses the dimensions of the theta names + sampler = scipy.stats.qmc.LatinHypercube(d=len(theta_names), seed=seed) + # Generate random samples in the range of [0, 1] for number of restarts + samples = sampler.random(n=n_restarts) + # Resulting samples should be size (n_restarts, len(theta_names)) + + elif multistart_sampling_method == "sobol_sampling": + if not isinstance(n_restarts, int): + raise TypeError("n_restarts must be an integer.") + if n_restarts <= 0: + raise ValueError("n_restarts must be greater than zero.") + sampler = scipy.stats.qmc.Sobol(d=len(theta_names), seed=seed) + # Generate theta values using Sobol sampling + # The first value of the Sobol sequence is 0, so we skip it + samples = sampler.random(n=n_restarts + 1)[1:] + + elif multistart_sampling_method == "user_provided_values": + raise ValueError( + "multistart_sampling_method='user_provided_values' requires user_provided_df." + ) + + else: + raise ValueError( + "Invalid sampling method. Choose 'uniform_random', 'latin_hypercube', 'sobol_sampling' or 'user_provided_values'." + ) + + if ( + multistart_sampling_method == "sobol_sampling" + or multistart_sampling_method == "latin_hypercube" + ): + # Scale the samples to the range of the lower and upper bounds for each theta in theta_names + # The samples are in the range [0, 1], so we scale them to the range of the lower and upper bounds + theta_vals_multistart = np.array( + [lower_bound + (upper_bound - lower_bound) * theta for theta in samples] + ) + + # Create a DataFrame where each row is an initial theta vector for a restart + if user_provided_df is None: + # Ensure theta_vals_multistart is 2D (n_restarts, len(theta_names)) + arr = np.atleast_2d(theta_vals_multistart) + if arr.shape[0] == 1 and n_restarts > 1: + arr = np.tile(arr, (n_restarts, 1)) + df_multistart = pd.DataFrame(arr, columns=theta_names) + + theta_arr = df_multistart[theta_names].to_numpy(dtype=float) + if not np.isfinite(theta_arr).all(): + raise ValueError("Initial theta values must be finite.") + if np.any(theta_arr < lower_bound) or np.any(theta_arr > upper_bound): + raise ValueError("Initial theta values must be within model bounds.") + + # Add columns for output info, initialized as nan + for name in theta_names: + df_multistart[f'converged_{name}'] = np.nan + df_multistart["final objective"] = np.nan + df_multistart["solver termination"] = "" + df_multistart["solve_time"] = np.nan + + # Debugging output + # print(df_multistart) + + return df_multistart + def _Q_opt( self, return_values=None, @@ -1102,6 +1343,7 @@ def _Q_opt( solver="ef_ipopt", theta_vals=None, fix_theta=False, + multistart=False, ): """ _Q_opt method for parameter estimation using an extensive form @@ -1136,6 +1378,10 @@ def _Q_opt( fix_theta : bool, optional If True, fix the theta values in the model. If False, leave them free. Default is False. + multistart : bool, optional + If True, run in multistart mode. Non-optimal termination is + returned instead of raising assertion failure. + Returns ------- obj_value : float or None @@ -1166,7 +1412,10 @@ def _Q_opt( """ # Create extended form model with scenario blocks model = self._create_scenario_blocks( - bootlist=bootlist, theta_vals=theta_vals, fix_theta=fix_theta + bootlist=bootlist, + theta_vals=theta_vals, + fix_theta=fix_theta, + multistart=multistart, ) expanded_theta_names = list(model._parmest_theta_names) @@ -1206,7 +1455,10 @@ def _Q_opt( obj_value = None return obj_value, theta_estimates, termination_condition - if not fix_theta: + # Separate handling of termination conditions for _Q_at_theta vs _Q_opt + # If not fixing theta, ensure optimal termination of the solve to return result + if not fix_theta and not multistart: + # Ensure optimal termination assert_optimal_termination(solve_result) model.solutions.load_from(solve_result) @@ -1221,6 +1473,12 @@ def _Q_opt( self.obj_value = obj_value self.estimated_theta = theta_estimates + # # If fixing theta, return objective value, theta estimates, and worst status + # if fix_theta or multistart: + # return obj_value, theta_estimates, worst_status + + # # Return theta estimates as a pandas Series + # theta_estimates = pd.Series(theta_estimates) # If fixing theta, return objective value, theta estimates, and solver status if fix_theta: return obj_value, theta_estimates, termination_condition @@ -1855,6 +2113,159 @@ def leaveNout_bootstrap_test( return results + def theta_est_multistart( + self, + n_restarts=20, + multistart_sampling_method="uniform_random", + user_provided_df=None, + seed=None, + save_results=False, + file_name="multistart_results.csv", + ): + """ + Run multistart parameter estimation and aggregate per-start results. + + A canonical starts/results table is created first, then each start is + solved (potentially in parallel with ``ParallelTaskManager``), and the + output table is populated with objective values, solver terminations, + solve times, and converged theta values. + + Parameters + ---------- + n_restarts : int, optional + Number of starts for sampled methods. Ignored when + ``user_provided_df`` is provided. + multistart_sampling_method : str, optional + Sampling method for generated starts. + user_provided_df : DataFrame, optional + User-provided starts. If provided, these rows define the restart + set directly. + seed : int, optional + Seed used by sampling methods. + save_results : bool, optional + If True, write the full results DataFrame to ``file_name``. + file_name : str, optional + Output CSV path used when ``save_results`` is True. + + Returns + ------- + tuple + ``(results_df, best_theta, best_obj)``, where: + - ``results_df`` contains one row per start plus converged metadata + - ``best_theta`` is the selected best feasible theta dictionary or + ``None`` if no finite objective exists + - ``best_obj`` is the selected objective value or ``np.nan`` + + Notes + ----- + Best-run selection prioritizes acceptable solver terminations + (optimal/locallyOptimal/globallyOptimal) and then minimizes objective. + If no acceptable statuses exist, finite-objective rows are considered. + """ + if self.pest_deprecated is not None: + raise RuntimeError( + "Multistart is not supported in the deprecated parmest interface." + ) + if user_provided_df is None: + if not isinstance(n_restarts, int): + raise TypeError("n_restarts must be an integer.") + if n_restarts <= 0: + raise ValueError("n_restarts must be greater than zero.") + + n_restarts_for_generation = None if user_provided_df is not None else n_restarts + results_df = self._generate_initial_theta( + seed=seed, + n_restarts=n_restarts_for_generation, + multistart_sampling_method=multistart_sampling_method, + user_provided_df=user_provided_df, + experiment_number=0, + ) + theta_names = [ + c + for c in results_df.columns + if not c.startswith("converged_") + and c not in {"final objective", "solver termination", "solve_time"} + ] + n_restarts = results_df.shape[0] + + # Convert each row to (row_index, theta_dict) + tasks = [] + for i in range(results_df.shape[0]): + Theta = {name: float(results_df.iloc[i][name]) for name in theta_names} + tasks.append((i, Theta)) + + task_mgr = utils.ParallelTaskManager(len(tasks)) + local_tasks = task_mgr.global_to_local_data(tasks) + + # Solve in parallel + local_results = [] + for i, Theta in local_tasks: + import time + + t0 = time.time() + try: + final_obj, theta_hat, worst = self._Q_opt( + theta_vals=Theta, multistart=True + ) + solve_time = time.time() - t0 + local_results.append((i, final_obj, str(worst), solve_time, theta_hat)) + except Exception as exc: + solve_time = time.time() - t0 + local_results.append( + ( + i, + np.nan, + f"exception(start={i}, sampler={multistart_sampling_method}): {exc}", + solve_time, + None, + ) + ) + + global_results = task_mgr.allgather_global_data(local_results) + + # Fill results_df + for i, final_obj, term, solve_time, theta_hat in global_results: + results_df.at[i, "final objective"] = final_obj + results_df.at[i, "solver termination"] = term + results_df.at[i, "solve_time"] = solve_time + + if theta_hat is not None: + for name in theta_names: + if name in theta_hat: + results_df.at[i, f"converged_{name}"] = float(theta_hat[name]) + + # Best solution: + # prioritize starts with acceptable solver terminations, then minimum objective. + acceptable_terms = { + str(pyo.TerminationCondition.optimal), + str(pyo.TerminationCondition.locallyOptimal), + str(pyo.TerminationCondition.globallyOptimal), + } + finite_obj_mask = np.isfinite( + results_df["final objective"].to_numpy(dtype=float) + ) + acceptable_mask = results_df["solver termination"].isin(acceptable_terms) + ranked = results_df[finite_obj_mask & acceptable_mask] + if ranked.empty: + ranked = results_df[finite_obj_mask] + + if ranked.empty: + best_theta = None + best_obj = np.nan + else: + best_idx = ranked["final objective"].astype(float).idxmin() + best_obj = float(results_df.loc[best_idx, "final objective"]) + best_theta = { + name: float(results_df.loc[best_idx, f"converged_{name}"]) + for name in theta_names + } + + if save_results: + results_df.to_csv(file_name, index=False) + + return results_df, best_theta, best_obj + + # Updated version that uses _Q_opt def _normalize_theta_dataframe(self, theta_values): """ Validate and normalize user-provided theta_values columns. diff --git a/pyomo/contrib/parmest/tests/test_parmest_multistart.py b/pyomo/contrib/parmest/tests/test_parmest_multistart.py new file mode 100644 index 00000000000..e8770e07221 --- /dev/null +++ b/pyomo/contrib/parmest/tests/test_parmest_multistart.py @@ -0,0 +1,433 @@ +# ___________________________________________________________________________ +# +# Pyomo: Python Optimization Modeling Objects +# Copyright (c) 2008-2026 National Technology and Engineering Solutions of +# Sandia, LLC Under the terms of Contract DE-NA0003525 with National +# Technology and Engineering Solutions of Sandia, LLC, the U.S. Government +# retains certain rights in this software. +# This software is distributed under the 3-clause BSD License. +# ___________________________________________________________________________ + +import math + +import pyomo.common.unittest as unittest +import pyomo.environ as pyo +from pyomo.common.dependencies import numpy as np, pandas as pd +from unittest.mock import patch + +import pyomo.contrib.parmest.parmest as parmest +from pyomo.contrib.parmest.experiment import Experiment + +ipopt_available = pyo.SolverFactory("ipopt").available() + + +class LinearThetaExperiment(Experiment): + def __init__(self, x, y): + self.x_data = x + self.y_data = y + self.model = None + + def create_model(self): + m = pyo.ConcreteModel() + m.theta = pyo.Var(initialize=0.0, bounds=(-10.0, 10.0)) + m.x = pyo.Param(initialize=float(self.x_data), mutable=False) + m.y = pyo.Var(initialize=float(self.y_data)) + m.eq = pyo.Constraint(expr=m.y == m.theta + m.x) + self.model = m + + def label_model(self): + m = self.model + m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.experiment_outputs.update([(m.y, float(self.y_data))]) + m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.unknown_parameters.update([(m.theta, pyo.ComponentUID(m.theta))]) + m.measurement_error = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.measurement_error.update([(m.y, None)]) + + def get_labeled_model(self): + self.create_model() + self.label_model() + return self.model + + +class IndexedThetaExperiment(Experiment): + def __init__(self): + self.model = None + + def create_model(self): + m = pyo.ConcreteModel() + m.I = pyo.Set(initialize=["a", "b"]) + m.theta = pyo.Var(m.I, initialize={"a": 1.0, "b": 2.0}) + m.theta["a"].setlb(0.0) + m.theta["a"].setub(5.0) + m.theta["b"].setlb(0.0) + m.theta["b"].setub(5.0) + m.theta["a"].fix() + m.theta["b"].fix() + m.y = pyo.Var(initialize=3.0) + m.eq = pyo.Constraint(expr=m.y == m.theta["a"] + m.theta["b"]) + self.model = m + + def label_model(self): + m = self.model + m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.experiment_outputs.update([(m.y, 3.0)]) + m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.unknown_parameters.update([(m.theta, pyo.ComponentUID(m.theta))]) + m.measurement_error = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.measurement_error.update([(m.y, None)]) + + def get_labeled_model(self): + self.create_model() + self.label_model() + return self.model + + +class NoBoundsExperiment(Experiment): + def __init__(self): + self.model = None + + def create_model(self): + m = pyo.ConcreteModel() + m.theta = pyo.Var(initialize=1.0) + m.y = pyo.Var(initialize=2.0) + m.eq = pyo.Constraint(expr=m.y == m.theta + 1.0) + self.model = m + + def label_model(self): + m = self.model + m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.experiment_outputs.update([(m.y, 2.0)]) + m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.unknown_parameters.update([(m.theta, pyo.ComponentUID(m.theta))]) + m.measurement_error = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.measurement_error.update([(m.y, None)]) + + def get_labeled_model(self): + self.create_model() + self.label_model() + return self.model + + +class StartCoupledExperiment(Experiment): + """ + Model intentionally couples a fixed term ("bias") to theta_initial at + build time. This exposes stale-model bugs in multistart paths. + """ + + def __init__(self, theta_initial=None): + self.theta_initial = ( + theta_initial if theta_initial is not None else {"theta": 0.0} + ) + self.model = None + + def create_model(self): + m = pyo.ConcreteModel() + m.theta = pyo.Var( + initialize=float(self.theta_initial["theta"]), bounds=(-10.0, 10.0) + ) + m.bias = pyo.Param(initialize=float(self.theta_initial["theta"]), mutable=False) + m.y = pyo.Var(initialize=0.0) + m.eq = pyo.Constraint(expr=m.y == m.theta + m.bias) + self.model = m + + def label_model(self): + m = self.model + m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.experiment_outputs.update([(m.y, 0.0)]) + m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.unknown_parameters.update([(m.theta, pyo.ComponentUID(m.theta))]) + m.measurement_error = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.measurement_error.update([(m.y, None)]) + + def get_labeled_model(self): + if self.model is None: + self.create_model() + self.label_model() + return self.model + + +def _build_linear_estimator(): + exp_list = [LinearThetaExperiment(1.0, 2.0), LinearThetaExperiment(2.0, 3.0)] + return parmest.Estimator(exp_list, obj_function="SSE") + + +@unittest.skipIf( + not parmest.parmest_available, + "Cannot test parmest: required dependencies are missing", +) +class TestParmestMultistart(unittest.TestCase): + @unittest.skipIf(not ipopt_available, "The 'ipopt' solver is not available") + def test_multistart_baseline_equivalence_n1(self): + pest = _build_linear_estimator() + obj1, theta1 = pest.theta_est() + _, best_theta, best_obj = pest.theta_est_multistart( + n_restarts=1, multistart_sampling_method="uniform_random", seed=7 + ) + self.assertAlmostEqual(obj1, best_obj, places=7) + self.assertAlmostEqual(theta1["theta"], best_theta["theta"], places=7) + + def test_uniform_sampling_is_deterministic_with_seed(self): + pest = _build_linear_estimator() + df1 = pest._generate_initial_theta( + seed=4, n_restarts=5, multistart_sampling_method="uniform_random" + ) + df2 = pest._generate_initial_theta( + seed=4, n_restarts=5, multistart_sampling_method="uniform_random" + ) + self.assertTrue(df1[["theta"]].equals(df2[["theta"]])) + + def test_uniform_sampling_changes_with_different_seed(self): + pest = _build_linear_estimator() + df1 = pest._generate_initial_theta( + seed=4, n_restarts=5, multistart_sampling_method="uniform_random" + ) + df2 = pest._generate_initial_theta( + seed=5, n_restarts=5, multistart_sampling_method="uniform_random" + ) + self.assertFalse(df1[["theta"]].equals(df2[["theta"]])) + + def test_latin_hypercube_sampling_is_deterministic(self): + pest = _build_linear_estimator() + df1 = pest._generate_initial_theta( + seed=11, n_restarts=4, multistart_sampling_method="latin_hypercube" + ) + df2 = pest._generate_initial_theta( + seed=11, n_restarts=4, multistart_sampling_method="latin_hypercube" + ) + self.assertTrue(df1[["theta"]].equals(df2[["theta"]])) + + def test_sobol_sampling_is_deterministic(self): + pest = _build_linear_estimator() + df1 = pest._generate_initial_theta( + seed=12, n_restarts=4, multistart_sampling_method="sobol_sampling" + ) + df2 = pest._generate_initial_theta( + seed=12, n_restarts=4, multistart_sampling_method="sobol_sampling" + ) + self.assertTrue(df1[["theta"]].equals(df2[["theta"]])) + + def test_generated_starts_are_within_bounds(self): + pest = _build_linear_estimator() + for method in ("uniform_random", "latin_hypercube", "sobol_sampling"): + df = pest._generate_initial_theta( + seed=1, n_restarts=8, multistart_sampling_method=method + ) + self.assertTrue(((df["theta"] >= -10.0) & (df["theta"] <= 10.0)).all()) + + def test_missing_bounds_raise_error(self): + pest = parmest.Estimator([NoBoundsExperiment()], obj_function="SSE") + with self.assertRaisesRegex( + ValueError, "lower and upper bounds for the theta values must be defined" + ): + pest._generate_initial_theta( + seed=1, n_restarts=2, multistart_sampling_method="uniform_random" + ) + + def test_invalid_bounds_raise_error(self): + class InvalidBoundsExperiment(Experiment): + def __init__(self): + self.model = None + + def create_model(self): + m = pyo.ConcreteModel() + m.theta = pyo.Var(initialize=1.0) + m.theta.setlb(2.0) + m.theta.setub(1.0) + m.y = pyo.Var(initialize=2.0) + m.eq = pyo.Constraint(expr=m.y == m.theta + 1.0) + self.model = m + + def label_model(self): + m = self.model + m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.experiment_outputs.update([(m.y, 2.0)]) + m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.unknown_parameters.update([(m.theta, pyo.ComponentUID(m.theta))]) + m.measurement_error = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.measurement_error.update([(m.y, None)]) + + def get_labeled_model(self): + self.create_model() + self.label_model() + return self.model + + pest = parmest.Estimator([InvalidBoundsExperiment()], obj_function="SSE") + with self.assertRaisesRegex(ValueError, "lower bound must be less than"): + pest._generate_initial_theta( + seed=1, n_restarts=2, multistart_sampling_method="uniform_random" + ) + + def test_user_provided_values_dimension_mismatch_raises(self): + pest = _build_linear_estimator() + user_df = pd.DataFrame([[1.0, 2.0]], columns=["theta", "extra"]) + with self.assertRaisesRegex(ValueError, "exactly one column per theta name"): + pest.theta_est_multistart( + n_restarts=1, + multistart_sampling_method="user_provided_values", + user_provided_df=user_df, + ) + + def test_user_provided_values_column_order_maps_by_name(self): + pest = parmest.Estimator([IndexedThetaExperiment()], obj_function="SSE") + user_df = pd.DataFrame( + [[0.3, 4.2], [0.4, 4.1]], columns=["theta[b]", "theta[a]"] + ) + results_df, _, _ = pest.theta_est_multistart( + n_restarts=2, + multistart_sampling_method="user_provided_values", + user_provided_df=user_df, + ) + self.assertAlmostEqual(results_df.loc[0, "theta[a]"], 4.2, places=12) + self.assertAlmostEqual(results_df.loc[0, "theta[b]"], 0.3, places=12) + + @unittest.skipIf(not ipopt_available, "The 'ipopt' solver is not available") + def test_state_isolation_between_starts(self): + pest = _build_linear_estimator() + init = pd.DataFrame([[-9.0], [9.0]], columns=["theta"]) + results_df, _, _ = pest.theta_est_multistart( + user_provided_df=init, save_results=False + ) + # Initial starts should remain exactly as supplied. + self.assertAlmostEqual(results_df.loc[0, "theta"], -9.0, places=12) + self.assertAlmostEqual(results_df.loc[1, "theta"], 9.0, places=12) + # Both runs converge to the same optimum, showing no cross-start contamination. + self.assertAlmostEqual( + results_df.loc[0, "converged_theta"], + results_df.loc[1, "converged_theta"], + places=8, + ) + + def test_one_start_failure_returns_best_feasible(self): + pest = _build_linear_estimator() + theta_values = pd.DataFrame([[-1.0], [2.0]], columns=["theta"]) + + def fake_q_opt(*args, **kwargs): + theta = kwargs["theta_vals"]["theta"] + if theta < 0: + raise RuntimeError("boom") + return 1.25, {"theta": 1.0}, pyo.TerminationCondition.optimal + + with patch.object(pest, "_Q_opt", side_effect=fake_q_opt): + results_df, best_theta, best_obj = pest.theta_est_multistart( + user_provided_df=theta_values, save_results=False + ) + + self.assertTrue( + str(results_df.loc[0, "solver termination"]).startswith("exception(start=0") + ) + self.assertAlmostEqual(best_obj, 1.25, places=12) + self.assertAlmostEqual(best_theta["theta"], 1.0, places=12) + + def test_all_starts_fail_returns_diagnostics(self): + pest = _build_linear_estimator() + theta_values = pd.DataFrame([[1.0], [2.0]], columns=["theta"]) + + def fake_q_opt(*args, **kwargs): + raise RuntimeError("all failed") + + with patch.object(pest, "_Q_opt", side_effect=fake_q_opt): + results_df, best_theta, best_obj = pest.theta_est_multistart( + user_provided_df=theta_values, save_results=False + ) + + self.assertIsNone(best_theta) + self.assertTrue(math.isnan(best_obj)) + self.assertTrue( + results_df["solver termination"] + .astype(str) + .str.contains("exception\\(start=", regex=True) + .all() + ) + + def test_best_selection_filters_nonoptimal_status(self): + pest = _build_linear_estimator() + theta_values = pd.DataFrame([[1.0], [2.0]], columns=["theta"]) + + def fake_q_opt(*args, **kwargs): + theta = kwargs["theta_vals"]["theta"] + if theta < 1.5: + return 0.1, {"theta": 0.1}, pyo.TerminationCondition.maxIterations + return 0.2, {"theta": 0.2}, pyo.TerminationCondition.optimal + + with patch.object(pest, "_Q_opt", side_effect=fake_q_opt): + _, best_theta, best_obj = pest.theta_est_multistart( + user_provided_df=theta_values, save_results=False + ) + + self.assertAlmostEqual(best_obj, 0.2, places=12) + self.assertAlmostEqual(best_theta["theta"], 0.2, places=12) + + def test_tie_breaking_is_deterministic_first_index(self): + pest = _build_linear_estimator() + theta_values = pd.DataFrame([[5.0], [6.0], [7.0]], columns=["theta"]) + + def fake_q_opt(*args, **kwargs): + theta = kwargs["theta_vals"]["theta"] + return 1.0, {"theta": theta}, pyo.TerminationCondition.optimal + + with patch.object(pest, "_Q_opt", side_effect=fake_q_opt): + _, best_theta, best_obj = pest.theta_est_multistart( + user_provided_df=theta_values, save_results=False + ) + + self.assertAlmostEqual(best_obj, 1.0, places=12) + self.assertAlmostEqual(best_theta["theta"], 5.0, places=12) + + def test_indexed_unknown_parameters_supported_in_sampling(self): + pest = parmest.Estimator([IndexedThetaExperiment()], obj_function="SSE") + df = pest._generate_initial_theta( + seed=10, n_restarts=3, multistart_sampling_method="uniform_random" + ) + self.assertTrue({"theta[a]", "theta[b]"}.issubset(set(df.columns))) + + def test_count_total_experiments_uses_one_output_family(self): + class MultiOutputExperiment(Experiment): + def create_model(self): + m = pyo.ConcreteModel() + m.theta = pyo.Var(initialize=0.0, bounds=(-10, 10)) + m.y = pyo.Var(initialize=1.0) + m.z = pyo.Var(initialize=2.0) + m.c1 = pyo.Constraint(expr=m.y == m.theta + 1.0) + m.c2 = pyo.Constraint(expr=m.z == 2.0 * m.theta + 2.0) + self.model = m + + def label_model(self): + m = self.model + m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.experiment_outputs.update([(m.y, 1.0), (m.z, 2.0)]) + m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.unknown_parameters.update([(m.theta, pyo.ComponentUID(m.theta))]) + m.measurement_error = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.measurement_error.update([(m.y, None), (m.z, None)]) + + def get_labeled_model(self): + self.create_model() + self.label_model() + return self.model + + total_points = parmest._count_total_experiments( + [MultiOutputExperiment(), MultiOutputExperiment()] + ) + self.assertEqual(total_points, 2) + + @unittest.skipIf(not ipopt_available, "The 'ipopt' solver is not available") + def test_multistart_results_reproducible_when_rerun_from_recorded_init(self): + pest = parmest.Estimator([StartCoupledExperiment()], obj_function="SSE") + init_df = pd.DataFrame([[-2.0], [1.5], [3.0]], columns=["theta"]) + results_df, _, _ = pest.theta_est_multistart( + user_provided_df=init_df, save_results=False + ) + + for _, row in results_df.iterrows(): + theta_init = {"theta": float(row["theta"])} + exp = StartCoupledExperiment(theta_initial=theta_init) + rerun = parmest.Estimator([exp], obj_function="SSE") + obj, theta = rerun.theta_est() + + self.assertTrue( + np.isclose(obj, row["final objective"], rtol=1e-6, atol=1e-8) + ) + self.assertTrue( + np.isclose(theta["theta"], row["converged_theta"], rtol=1e-6, atol=1e-8) + )