diff --git a/mpisppy/cylinders/fwph_cylinder.py b/mpisppy/cylinders/fwph_cylinder.py new file mode 100644 index 000000000..c155ff9dc --- /dev/null +++ b/mpisppy/cylinders/fwph_cylinder.py @@ -0,0 +1,69 @@ +############################################################################### +# mpi-sppy: MPI-based Stochastic Programming in PYthon +# +# Copyright (c) 2024, Lawrence Livermore National Security, LLC, Alliance for +# Sustainable Energy, LLC, The Regents of the University of California, et al. +# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for +# full copyright and license information. +############################################################################### +from mpisppy.cylinders.spwindow import Field +from mpisppy.cylinders.spcommunicator import SPCommunicator, RecvCircularBuffer + +class FWPH_Cylinder(SPCommunicator): + + send_fields = () + receive_fields = (Field.BEST_XHAT, Field.RECENT_XHATS, ) + + def register_receive_fields(self): + super().register_receive_fields() + self._recent_xhat_recv_circular_buffers = [ + (idx, cls, RecvCircularBuffer( + recv_buf, + self._field_lengths[Field.BEST_XHAT], + self._field_lengths[Field.RECENT_XHATS] // self._field_lengths[Field.BEST_XHAT], + ) + ) + for (idx, cls, recv_buf) in self.receive_field_spcomms[Field.RECENT_XHATS] + ] + + def add_cylinder_columns(self): + self.add_best_xhat_column() + self.add_recent_xhat_columns() + # self.add_fwph_sdm_column() + + def add_best_xhat_column(self): + for idx, cls, recv_buf in self.receive_field_spcomms[Field.BEST_XHAT]: + is_new = self.get_receive_buffer(recv_buf, Field.BEST_XHAT, idx) + if is_new: + self._add_QP_columns_from_buf(recv_buf.value_array()) + + def add_recent_xhat_columns(self): + for idx, cls, recv_buf_circular in self._recent_xhat_recv_circular_buffers: + is_new = self.get_receive_buffer(recv_buf_circular.data, Field.RECENT_XHATS, idx) + if is_new: + for value_array in recv_buf_circular.most_recent_value_arrays(): + self._add_QP_columns_from_buf(value_array) + + def _add_QP_columns_from_buf(self, xhat_array): + self.opt._save_nonants() + inner_bound_cache = self._cache_inner_bounds() + # NOTE: this does not work with "loose" bundles + # print(f"{self.cylinder_rank=} got {xhat_array=}") + ci = 0 + for k,s in self.opt.local_scenarios.items(): + for ndn_var in s._mpisppy_data.nonant_indices.values(): + ndn_var._value = xhat_array[ci] + ci += 1 + s._mpisppy_data.inner_bound = xhat_array[ci] + ci += 1 + self.opt._add_QP_column(k, disable_W=True) + + self._restore_inner_bounds(inner_bound_cache) + self.opt._restore_nonants(update_persistent=False) + + def _cache_inner_bounds(self): + return {s : s._mpisppy_data.inner_bound for s in self.opt.local_scenarios.values()} + + def _restore_inner_bounds(self, inner_bound_cache): + for s in self.opt.local_scenarios.values(): + s._mpisppy_data.inner_bound = inner_bound_cache[s] diff --git a/mpisppy/cylinders/fwph_spoke.py b/mpisppy/cylinders/fwph_spoke.py index e81529b8b..630612646 100644 --- a/mpisppy/cylinders/fwph_spoke.py +++ b/mpisppy/cylinders/fwph_spoke.py @@ -6,9 +6,13 @@ # All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for # full copyright and license information. ############################################################################### -import mpisppy.cylinders.spoke +from mpisppy.cylinders.spoke import OuterBoundSpoke +from mpisppy.cylinders.fwph_cylinder import FWPH_Cylinder -class FrankWolfeOuterBound(mpisppy.cylinders.spoke.OuterBoundSpoke): +class FrankWolfeOuterBound(OuterBoundSpoke, FWPH_Cylinder): + + send_fields = (*OuterBoundSpoke.send_fields, *FWPH_Cylinder.send_fields) + receive_fields = (*OuterBoundSpoke.receive_fields, *FWPH_Cylinder.receive_fields) converger_spoke_char = 'F' diff --git a/mpisppy/cylinders/hub.py b/mpisppy/cylinders/hub.py index f2713e48d..449cced5c 100644 --- a/mpisppy/cylinders/hub.py +++ b/mpisppy/cylinders/hub.py @@ -16,6 +16,7 @@ from mpisppy import global_toc from mpisppy.cylinders.spwindow import Field +from mpisppy.cylinders.fwph_cylinder import FWPH_Cylinder # Could also pass, e.g., sys.stdout instead of a filename mpisppy.log.setup_logger(__name__, @@ -255,7 +256,7 @@ def sync_nonants(self): def sync_Ws(self): self.send_ws() - def is_converged(self): + def is_converged(self, screen_trace=True): if self.opt.best_bound_obj_val is not None: self.BestOuterBound = self.OuterBoundUpdate(self.opt.best_bound_obj_val) if self.opt.best_solution_obj_val is not None: @@ -269,7 +270,7 @@ def is_converged(self): ) ## you still want to output status, even without inner bounders configured - if self.global_rank == 0: + if self.global_rank == 0 and screen_trace: self.screen_trace() return False @@ -281,7 +282,7 @@ def is_converged(self): "will be made on the Best Bound") ## log some output - if self.global_rank == 0: + if self.global_rank == 0 and screen_trace: self.screen_trace() return self.determine_termination() @@ -440,7 +441,10 @@ def finalize(self): Eobj = self.opt.post_loops() return Eobj -class FWPHHub(PHHub): +class FWPHHub(PHHub, FWPH_Cylinder): + + send_fields = (*PHHub.send_fields, *FWPH_Cylinder.send_fields) + receive_fields = (*PHHub.receive_fields, *FWPH_Cylinder.receive_fields) _hub_algo_best_bound_provider = True diff --git a/mpisppy/cylinders/spoke.py b/mpisppy/cylinders/spoke.py index 5a83b0b0f..98d68d2d7 100644 --- a/mpisppy/cylinders/spoke.py +++ b/mpisppy/cylinders/spoke.py @@ -29,7 +29,7 @@ def got_kill_signal(self): self.get_receive_buffer(shutdown_buf, Field.SHUTDOWN, 0, synchronize=False) return self.allreduce_or(shutdown_buf[0] == 1.0) - def is_converged(self): + def is_converged(self, screen_trace=False): """ Alias for got_kill_signal; useful for algorithms working as both hub and spoke """ diff --git a/mpisppy/opt/fwph.py b/mpisppy/opt/fwph.py index 4a4372927..2468977e2 100644 --- a/mpisppy/opt/fwph.py +++ b/mpisppy/opt/fwph.py @@ -23,6 +23,7 @@ import time import re # For manipulating scenario names import random +import math from mpisppy import MPI from mpisppy import global_toc @@ -32,6 +33,7 @@ from pyomo.core.expr.numeric_expr import LinearExpression from mpisppy.cylinders.xhatshufflelooper_bounder import ScenarioCycler +from mpisppy.cylinders.spwindow import Field from mpisppy.extensions.xhatbase import XhatBase class FWPH(mpisppy.phbase.PHBase): @@ -75,16 +77,16 @@ def _init(self, FW_options): if ('FW_verbose' in self.FW_options): self.vb = self.FW_options['FW_verbose'] - def fw_prep(self): + def fwph_main(self, finalize=True): self.PH_Prep(attach_duals=True, attach_prox=False) self._output_header() self._attach_MIP_vars() self._cache_nonant_var_swap_mip() trivial_bound = self.Iter0() - secs = time.time() - self.t0 + secs = time.perf_counter() - self.start_time self._output(trivial_bound, trivial_bound, np.nan, secs) - best_bound = trivial_bound + self._fwph_best_bound = trivial_bound # Lines 2 and 3 of Algorithm 3 in Boland # Now done a the beginning of the first iteration @@ -104,152 +106,275 @@ def fw_prep(self): self._cache_nonant_var_swap_qp() self._setup_shared_column_generation() - number_initial_column_tries = self.options.get("FW_initialization_attempts", 10) - if self.FW_options["FW_iter_limit"] == 1 and number_initial_column_tries < 1: - global_toc(f"{self.__class__.__name__}: Warning: FWPH needs an initial shared column if FW_iter_limit == 1. Increasing FW_iter_limit to 2 to ensure convergence", self.cylinder_rank == 0) - self.FW_options["FW_iter_limit"] = 2 - if self.FW_options["FW_iter_limit"] == 1 or number_initial_column_tries > 0: - number_points = self._generate_shared_column(number_initial_column_tries) - if number_points == 0 and self.FW_options["FW_iter_limit"] == 1: - global_toc(f"{self.__class__.__name__}: Warning: FWPH failed to find an initial feasible solution. Increasing FW_iter_limit to 2 to ensure convergence", self.cylinder_rank == 0) - self.FW_options["FW_iter_limit"] = 2 + self._generate_initial_columns_if_needed() self._reenable_W() if (self.ph_converger): - self.convobject = self.ph_converger(self, self.cylinder_rank, self.n_proc) + self.convobject = self.ph_converger(self) + + if self.options.get("FW_LP_start_iterations", 0) > 0: + global_toc("Starting LP PH...") + lp_iterations = self.options["FW_LP_start_iterations"] + total_iterations = self.options["PHIterLimit"] + self.options["PHIterLimit"] = lp_iterations + integer_relaxer = pyo.TransformationFactory('core.relax_integer_vars') + for s in self.local_subproblems.values(): + integer_relaxer.apply_to(s) + if sputils.is_persistent(s._solver_plugin): + for v,_ in s._relaxed_integer_vars[None].values(): + s._solver_plugin.update_var(v) + self.attach_PH_to_objective(add_duals=False, add_prox=True) + self._reenable_prox() + super().iterk_loop() + self._disable_prox() + for s in self.local_subproblems.values(): + for v, d in s._relaxed_integer_vars[None].values(): + v.domain = d + if sputils.is_persistent(s._solver_plugin): + s._solver_plugin.update_var(v) + # s._solver_plugin.update_var(v) + s.del_component("_relaxed_integer_vars") + self.options["PHIterLimit"] = total_iterations + self._PHIter -= 1 + + global_toc("Finished LP PH; Starting FW PH crossover") + teeme = ( + self.options.get("tee-rank0-solves", False) + and self.cylinder_rank == 0 + ) + # teeme = True + self.fwph_solve_loop( + mip_solver_options=self.current_solver_options, + dtiming=self.options["display_timing"], + tee=teeme, + verbose=self.options["verbose"], + # sdm_iter_limit=20, + # FW_conv_thresh=1e-8, + ) + global_toc("Starting FW PH") + + else: + # FWPH can take some time to initialize + # If run as a spoke, check for convergence here + if self.spcomm and self.spcomm.is_converged(): + if finalize: + return 0, None, None + return 0 - return best_bound + self.iterk_loop() - def fwph_main(self, finalize=True): - self.t0 = time.time() - best_bound = self.fw_prep() + if finalize: + weight_dict = self._gather_weight_dict() # None if rank != 0 + xbars_dict = self._get_xbars() # None if rank != 0 + return self._PHIter, weight_dict, xbars_dict + return self._PHIter - # FWPH takes some time to initialize - # If run as a spoke, check for convergence here - if self.spcomm and self.spcomm.is_converged(): - return None, None, None + def iterk_loop(self): + + verbose = self.options["verbose"] + dprogress = self.options["display_progress"] + dtiming = self.options["display_timing"] + dconvergence_detail = self.options["display_convergence_detail"] + teeme = ( + "tee-rank0-solves" in self.options + and self.options["tee-rank0-solves"] + and self.cylinder_rank == 0 + ) + # teeme = True + + self.conv = None + + max_iterations = int(self.options["PHIterLimit"]) # The body of the algorithm - for self._PHIter in range(1, self.options['PHIterLimit']+1): + while (self._PHIter < max_iterations): + iteration_start_time = time.perf_counter() + if dprogress: + global_toc(f"Initiating FWPH Major Iteration {self._PHIter+1}\n", self.cylinder_rank == 0) - # tbphloop = time.time() + # tbphloop = time.perf_counter() # TODO: should implement our own Xbar / W computation # which just considers the QP subproblems self._swap_nonant_vars() - self.Compute_Xbar(self.options['verbose']) - self.Update_W(self.options['verbose']) + self.Compute_Xbar(verbose) + self.Update_W(verbose) self._swap_nonant_vars_back() - if (self.extensions): - self.extobject.miditer() - if hasattr(self.spcomm, "sync_Ws"): self.spcomm.sync_Ws() - if (self._is_timed_out()): - global_toc("FWPH Timed Out", self.cylinder_rank == 0) - break + + self.conv = self.fwph_convergence_diff() + + if (self.extensions): + self.extobject.miditer() if (self.ph_converger): - diff = self.convobject.convergence_value() + self._swap_nonant_vars() if (self.convobject.is_converged()): - secs = time.time() - self.t0 - self._output(self._local_bound, best_bound, diff, secs) + secs = time.perf_counter() - self.start_time + self._output(self._local_bound, self._fwph_best_bound, self.conv, secs) global_toc('FWPH converged to user-specified criteria', self.cylinder_rank == 0) + self._swap_nonant_vars_back() break - else: # Convergence check from Boland - diff = self._conv_diff() - if (diff < self.options['convthresh']): - secs = time.time() - self.t0 - self._output(self._local_bound, best_bound, diff, secs) - global_toc(f'FWPH converged based on standard criteria, convergence diff: {diff}', - self.cylinder_rank == 0, + self._swap_nonant_vars_back() + if self.conv is not None: # Convergence check from Boland + if (self.conv < self.options['convthresh']): + secs = time.perf_counter() - self.start_time + self._output(self._local_bound, self._fwph_best_bound, self.conv, secs) + global_toc( + "FWPH convergence metric=%f dropped below user-supplied threshold=%f" % (self.conv, self.options["convthresh"]), + self.cylinder_rank == 0, ) break - self._swap_nonant_vars() - self._local_bound = 0 - # tbsdm = time.time() - for name in self.local_subproblems: - dual_bound = self.SDM(name) - if dual_bound is None: - dual_bound = np.nan - self._local_bound += self.local_subproblems[name]._mpisppy_probability * \ - dual_bound - # tsdm = time.time() - tbsdm - # print(f"PH iter {self._PHIter}, total SDM time: {tsdm}") - self._compute_dual_bound() - if (self.is_minimizing): - best_bound = np.fmax(best_bound, self._local_bound) - else: - best_bound = np.fmin(best_bound, self._local_bound) - if self._can_update_best_bound(): - self.best_bound_obj_val = best_bound - self._swap_nonant_vars_back() + if (self._is_timed_out()): + global_toc(f"Time limit {self.options['time_limit']} seconds reached.", self.cylinder_rank == 0) + break + + self.fwph_solve_loop( + mip_solver_options=self.current_solver_options, + dtiming=dtiming, + tee=teeme, + verbose=verbose + ) if (self.extensions): self.extobject.enditer() - secs = time.time() - self.t0 - self._output(self._local_bound, best_bound, diff, secs) - - - # add a shared column - shared_columns = self.options.get("FWPH_shared_columns_per_iteration", 0) - if shared_columns > 0: - self.mpicomm.Barrier() - self._disable_W() - for s in self.local_subproblems.values(): - if sputils.is_persistent(s._solver_plugin): - active_objective_datas = list(s.component_data_objects( - pyo.Objective, active=True, descend_into=True)) - s._solver_plugin.set_objective(active_objective_datas[0]) - self._generate_shared_column(shared_columns) - self._reenable_W() + secs = time.perf_counter() - self.start_time + self._output(self._local_bound, self._fwph_best_bound, self.conv, secs) ## Hubs/spokes take precedence over convergers - if hasattr(self.spcomm, "sync_nonants"): - self.spcomm.sync_nonants() - self.spcomm.sync_bounds() - self.spcomm.sync_extensions() - elif hasattr(self.spcomm, "sync"): - self.spcomm.sync() - if self.spcomm and self.spcomm.is_converged(): - secs = time.time() - self.t0 - self._output(self._local_bound, best_bound, np.nan, secs) - global_toc('FWPH stopped due to cylinder convergence') + if self.spcomm and self.spcomm.is_converged(screen_trace=False): + secs = time.perf_counter() - self.start_time + self._output(self._local_bound, self._fwph_best_bound, np.nan, secs) + global_toc("Cylinder convergence", self.cylinder_rank == 0) break - if (self.extensions): + if (self.extensions): self.extobject.enditer_after_sync() - # tphloop = time.time() - tbphloop + + if dprogress and self.cylinder_rank == 0: + print("") + print("After FWPH Iteration",self._PHIter) + print("FWPH Convergence Metric=",self.conv) + print("Iteration time: %6.2f" % (time.perf_counter() - iteration_start_time)) + print("Elapsed time: %6.2f" % (time.perf_counter() - self.start_time)) + + if dconvergence_detail: + self.report_var_values_at_rank0(header="Convergence detail:", fixed_vars=False) + + # tphloop = time.perf_counter() - tbphloop # print(f"PH iter {self._PHIter}, total time: {tphloop}") + else: # no break, (self._PHIter == max_iterations) + # NOTE: If we return for any other reason things are reasonably in-sync. + # due to the convergence check. However, here we return we'll be + # out-of-sync because of the solve_loop could take vasty different + # times on different threads. This can especially mess up finalization. + # As a guard, we'll put a barrier here. + self.mpicomm.Barrier() + global_toc("Reached user-specified limit=%d on number of FWPH iterations" % max_iterations, self.cylinder_rank == 0) + + def fwph_solve_loop( + self, + mip_solver_options=None, + dtiming=False, + tee=False, + verbose=False, + sdm_iter_limit=None, + FW_conv_thresh=None, + ): + if sdm_iter_limit is None: + sdm_iter_limit = self.FW_options["FW_iter_limit"] + if FW_conv_thresh is None: + FW_conv_thresh = self.FW_options["FW_conv_thresh"] + max_iterations = int(self.options["PHIterLimit"]) + # print(f"{sdm_iter_limit=}") + self._swap_nonant_vars() + self._local_bound = 0 + # tbsdm = time.perf_counter() + _sdm_generators = {} + stop = False + for name in self.local_subproblems: + _sdm_generators[name] = self.SDM(name, mip_solver_options, dtiming, tee, verbose, sdm_iter_limit, FW_conv_thresh) + try: + dual_bound = next(_sdm_generators[name]) + except StopIteration as e: + dual_bound = e.value + stop = True + self._local_bound += self.local_subproblems[name]._mpisppy_probability * \ + dual_bound + self._update_dual_bounds() + self._PHIter += 1 + if self._PHIter == max_iterations: + stop = True + if self._sync_after_mip_solve(): + stop = True + stop = self.allreduce_or(stop) + while not stop: + stop = False + for col_generator in _sdm_generators.values(): + try: + next(col_generator) + except StopIteration: + stop = True + self._PHIter += 1 + if self._PHIter == max_iterations: + stop = True + if self._sync_after_mip_solve(): + stop = True + stop = self.allreduce_or(stop) + # tsdm = time.perf_counter() - tbsdm + # print(f"PH iter {self._PHIter}, total SDM time: {tsdm}") - if finalize: - weight_dict = self._gather_weight_dict() # None if rank != 0 - xbars_dict = self._get_xbars() # None if rank != 0 - return self._PHIter, weight_dict, xbars_dict - return self._PHIter + # Re-set the mip._mpisppy_model.W so that the QP objective + # is correct in the next major iteration + for model_name, mip in self.local_subproblems.items(): + qp = self.local_QP_subproblems[model_name] + mip_source = mip.scen_list if self.bundling else [model_name] + for scenario_name in mip_source: + scen_mip = self.local_scenarios[scenario_name] + for (node_name, ix) in scen_mip._mpisppy_data.nonant_indices: + scen_mip._mpisppy_model.W[node_name, ix]._value = \ + qp._mpisppy_model.W[node_name, ix]._value + + self._swap_nonant_vars_back() + + def _sync_after_mip_solve(self): + # add columns from cylinder(s) + self._swap_nonant_vars_back() + # spoke or hub (FWPH_Cylinder) + if hasattr(self.spcomm, "add_cylinder_columns"): + self.spcomm.add_cylinder_columns() + # hub + if hasattr(self.spcomm, "sync_bounds"): + self.spcomm.sync_nonants() + self.spcomm.sync_bounds() + self.spcomm.sync_extensions() + # spoke + elif hasattr(self.spcomm, "sync"): + self.spcomm.sync() + self._swap_nonant_vars() + if self.spcomm and self.spcomm.is_converged(): + return True + return False - def SDM(self, model_name): + def SDM(self, model_name, mip_solver_options, dtiming, tee, verbose, sdm_iter_limit, FW_conv_thresh): ''' Algorithm 2 in Boland et al. (with small tweaks) ''' mip = self.local_subproblems[model_name] qp = self.local_QP_subproblems[model_name] - verbose = self.options["verbose"] - dtiming = True # self.options["display_timing"] - teeme = ("tee-rank0-solves" in self.options - and self.options['tee-rank0-solves'] - and self.cylinder_rank == 0 - ) # Set the QP dual weights to the correct values If we are bundling, we # initialize the QP dual weights to be the dual weights associated with # the first scenario in the bundle (this should be okay, because each # scenario in the bundle has the same dual weights, analytically--maybe # a numerical problem). - arb_scen_mip = self.local_scenarios[mip.scen_list[0]] \ - if self.bundling else mip + mip_source = mip.scen_list if self.bundling else [model_name] + + arb_scen_mip = self.local_scenarios[mip_source[0]] + for (node_name, ix) in arb_scen_mip._mpisppy_data.nonant_indices: qp._mpisppy_model.W[node_name, ix]._value = \ arb_scen_mip._mpisppy_model.W[node_name, ix].value @@ -262,10 +387,8 @@ def SDM(self, model_name): for ndn_i, xvar in arb_scen_mip._mpisppy_data.nonant_indices.items() } - mip_source = mip.scen_list if self.bundling else [model_name] - - for itr in range(self.FW_options['FW_iter_limit']): - # loop_start = time.time() + for itr in range(sdm_iter_limit): + # loop_start = time.perf_counter() # Algorithm 2 line 4 for scenario_name in mip_source: scen_mip = self.local_scenarios[scenario_name] @@ -276,66 +399,50 @@ def SDM(self, model_name): * (xt[ndn_i] - scen_mip._mpisppy_model.xbars[ndn_i]._value)) - cutoff = pyo.value(qp._mpisppy_model.mip_obj_in_qp) + pyo.value(qp.recourse_cost) - # tbmipsolve = time.time() - active_objective = sputils.find_active_objective(mip) - if self.is_minimizing: - # obj <= cutoff - obj_cutoff_constraint = (None, active_objective, cutoff) - else: - # obj >= cutoff - obj_cutoff_constraint = (cutoff, active_objective, None) - mip._mpisppy_model.obj_cutoff_constraint = pyo.Constraint(expr=obj_cutoff_constraint) - if sputils.is_persistent(mip._solver_plugin): - mip._solver_plugin.add_constraint(mip._mpisppy_model.obj_cutoff_constraint) + self._fix_fixings(model_name, mip, qp) + cutoff = self._add_objective_cutoff(mip, qp) + # print(f"{model_name=}, {cutoff=}") # Algorithm 2 line 5 self.solve_one( - self.options["iterk_solver_options"], + mip_solver_options, model_name, mip, dtiming=dtiming, - tee=teeme, + tee=tee, verbose=verbose, + # if the problem isn't feasible, + # becuase of the cutoff, we should + # probably keep going ?? + need_solution=False, ) - if sputils.is_persistent(mip._solver_plugin): - mip._solver_plugin.remove_constraint(mip._mpisppy_model.obj_cutoff_constraint) - mip.del_component(mip._mpisppy_model.obj_cutoff_constraint) - # tmipsolve = time.time() - tbmipsolve - if mip._mpisppy_data.scenario_feasible: + self._remove_objective_cutoff(mip) + # tmipsolve = time.perf_counter() - tbmipsolve - # Algorithm 2 lines 6--8 - if (itr == 0): - dual_bound = mip._mpisppy_data.outer_bound + if mip._mpisppy_data.scenario_feasible: # Algorithm 2 line 9 (compute \Gamma^t) inner_bound = mip._mpisppy_data.inner_bound - if abs(inner_bound) > 1e-9: - stop_check = (cutoff - inner_bound) / abs(inner_bound) # \Gamma^t in Boland, but normalized - else: - stop_check = cutoff - inner_bound # \Gamma^t in Boland - # print(f"{model_name}, Gamma^t = {stop_check}") - stop_check_tol = self.FW_options["stop_check_tol"]\ - if "stop_check_tol" in self.FW_options else 1e-4 - if (self.is_minimizing and stop_check < -stop_check_tol): - print('Warning (fwph): convergence quantity Gamma^t = ' - '{sc:.2e} (should be non-negative)'.format(sc=stop_check)) - print('Try decreasing the MIP gap tolerance and re-solving') - elif (not self.is_minimizing and stop_check > stop_check_tol): - print('Warning (fwph): convergence quantity Gamma^t = ' - '{sc:.2e} (should be non-positive)'.format(sc=stop_check)) - print('Try decreasing the MIP gap tolerance and re-solving') - - # tbcol = time.time() + # print(f"{model_name=}, {inner_bound=}") + gamma_t = self._compute_gamma_t(cutoff, inner_bound) + # print(f"{itr=}, {model_name=}, {gamma_t=}") + + # tbcol = time.perf_counter() self._add_QP_column(model_name) - # tcol = time.time() - tbcol + # tcol = time.perf_counter() - tbcol # print(f"{model_name} QP add_column time: {tcol}") else: - dual_bound = None global_toc(f"{self.__class__.__name__}: Could not find an improving column for {model_name}!", True) # couldn't find an improving direction, the column would not become active - # tbqpsol = time.time() + # add a shared column(s) + shared_columns = self.options.get("FWPH_shared_columns_per_iteration", 0) + if shared_columns > 0: + self._swap_nonant_vars_back() + self._add_shared_columns(shared_columns) + self._swap_nonant_vars() + + # tbqpsol = time.perf_counter() # QPs are weird if bundled _bundling = self.bundling self.solve_one( @@ -343,33 +450,69 @@ def SDM(self, model_name): model_name, qp, dtiming=dtiming, - tee=teeme, + tee=tee, verbose=verbose, ) self.bundling = _bundling - # tqpsol = time.time() - tbqpsol + # tqpsol = time.perf_counter() - tbqpsol # print(f"{model_name}, solve + add_col time: {tmipsolve + tcol + tqpsol}") - # fwloop = time.time() - loop_start + # fwloop = time.perf_counter() - loop_start # print(f"{model_name}, total loop time: {fwloop}") + # Algorithm 2 lines 6--8 + # Stopping after the MIP solve will give a point + # to synchronize with spokes + dual_bound = None + if (itr == 0): + if mip._mpisppy_data.scenario_feasible: + dual_bound = mip._mpisppy_data.outer_bound + else: + dual_bound = np.nan - if dual_bound is None or (stop_check < self.FW_options['FW_conv_thresh']): - break + if itr + 1 == sdm_iter_limit or not mip._mpisppy_data.scenario_feasible or gamma_t < FW_conv_thresh: + return dual_bound + else: + yield dual_bound # reset for next loop - xt = {ndn_i : xvar._value for ndn_i, xvar in arb_scen_mip._mpisppy_data.nonant_indices.items()} + for ndn_i, xvar in arb_scen_mip._mpisppy_data.nonant_indices.items(): + xt[ndn_i] = xvar._value - # Re-set the mip._mpisppy_model.W so that the QP objective - # is correct in the next major iteration - for scenario_name in mip_source: - scen_mip = self.local_scenarios[scenario_name] - for (node_name, ix) in scen_mip._mpisppy_data.nonant_indices: - scen_mip._mpisppy_model.W[node_name, ix]._value = \ - qp._mpisppy_model.W[node_name, ix]._value - return dual_bound + def _add_shared_columns(self, shared_columns): + self.mpicomm.Barrier() + self._disable_W() + for s in self.local_subproblems.values(): + if sputils.is_persistent(s._solver_plugin): + active_objective_datas = list(s.component_data_objects( + pyo.Objective, active=True, descend_into=True)) + s._solver_plugin.set_objective(active_objective_datas[0]) + self._generate_shared_column(shared_columns) + self._reenable_W() - def _add_QP_column(self, model_name): + def _fix_fixings(self, model_name, mip, qp): + """ If some variable is fixed in the mip, but its value in the QP does + not agree with that fixed value, we will have a bad time. This method + removes such fixings. + """ + for var in qp.x.values(): + if var.fixed: + raise RuntimeError(f"var {var.name} is fixed in QP!!") + solver = mip._solver_plugin + unfixed = 0 + target = mip.ref_vars if self.bundling else mip._mpisppy_data.nonant_vars + mip_to_qp = mip._mpisppy_data.mip_to_qp + for ndn_i, var in target.items(): + if var.fixed: + if not math.isclose(mip_to_qp[id(var)].value, var.value, abs_tol=1e-5): + var.unfix() + if sputils.is_persistent(solver): + solver.update_var(var) + unfixed += 1 + if unfixed > 0: + global_toc(f"{self.__class__.__name__}: unfixed {unfixed} nonant variables in {model_name}", True) + + def _add_QP_column(self, model_name, disable_W=False): ''' Add a column to the QP, with values taken from the most recent MIP solve. Assumes the inner_bound is up-to-date in the MIP model. ''' @@ -378,7 +521,11 @@ def _add_QP_column(self, model_name): solver = qp._solver_plugin persistent = sputils.is_persistent(solver) + if disable_W: + self._disable_W() total_recourse_cost = mip._mpisppy_data.inner_bound - pyo.value(mip._mpisppy_model.nonant_obj_part) + if disable_W: + self._reenable_W() if hasattr(solver, 'add_column'): new_var = qp.a.add() @@ -418,6 +565,58 @@ def _add_QP_column(self, model_name): solver.remove_constraint(qp.eq_recourse_cost) solver.add_constraint(qp.eq_recourse_cost) + def _compute_gamma_t(self, cutoff, inner_bound): + if abs(inner_bound) > 1e-9: + stop_check = (cutoff - inner_bound) / abs(inner_bound) # \Gamma^t in Boland, but normalized + else: + stop_check = cutoff - inner_bound # \Gamma^t in Boland + # print(f"{model_name}, Gamma^t = {stop_check}") + stop_check_tol = self.FW_options["stop_check_tol"]\ + if "stop_check_tol" in self.FW_options else 1e-4 + if (self.is_minimizing and stop_check < -stop_check_tol): + print('Warning (fwph): convergence quantity Gamma^t = ' + '{sc:.2e} (should be non-negative)'.format(sc=stop_check)) + print('Try decreasing the MIP gap tolerance and re-solving') + elif (not self.is_minimizing and stop_check > stop_check_tol): + print('Warning (fwph): convergence quantity Gamma^t = ' + '{sc:.2e} (should be non-positive)'.format(sc=stop_check)) + print('Try decreasing the MIP gap tolerance and re-solving') + return stop_check + + def _add_objective_cutoff(self, mip, qp): + """ Add a constraint to the MIP objective ensuring + an improving direction in the QP subproblem is generated + """ + assert not hasattr(mip._mpisppy_model, "obj_cutoff_constraint") + # print(f"\tnonants part: {pyo.value(qp._mpisppy_model.mip_obj_in_qp)}") + # print(f"\trecoursepart: {pyo.value(qp.recourse_cost)}") + cutoff = pyo.value(qp._mpisppy_model.mip_obj_in_qp) + pyo.value(qp.recourse_cost) + epsilon = 0 #1e-4 + rel_epsilon = abs(cutoff)*epsilon + epsilon = max(epsilon, rel_epsilon) + # tbmipsolve = time.perf_counter() + active_objective = sputils.find_active_objective(mip) + if self.is_minimizing: + # obj <= cutoff + obj_cutoff_constraint = (None, active_objective.expr, cutoff+epsilon) + else: + # obj >= cutoff + obj_cutoff_constraint = (cutoff-epsilon, active_objective.expr, None) + mip._mpisppy_model.obj_cutoff_constraint = pyo.Constraint(expr=obj_cutoff_constraint) + if sputils.is_persistent(mip._solver_plugin): + mip._solver_plugin.add_constraint(mip._mpisppy_model.obj_cutoff_constraint) + return cutoff + + def _remove_objective_cutoff(self, mip): + """ Remove the constraint to the MIP objective added by + _add_objective_cutoff + """ + assert hasattr(mip._mpisppy_model, "obj_cutoff_constraint") + if sputils.is_persistent(mip._solver_plugin): + mip._solver_plugin.remove_constraint(mip._mpisppy_model.obj_cutoff_constraint) + mip.del_component(mip._mpisppy_model.obj_cutoff_constraint) + delattr(mip._mpisppy_model, "obj_cutoff_constraint") + def _attach_indices(self): ''' Attach the fields x_indices to the model objects in self.local_subproblems (not self.local_scenarios, nor @@ -477,7 +676,7 @@ def _attach_MIP_vars(self): mip._mpisppy_data.nonant_vars = mip._mpisppy_data.nonant_indices self._attach_nonant_objective(mip) - def _compute_dual_bound(self): + def _update_dual_bounds(self): ''' Compute the FWPH dual bound using self._local_bound from each rank ''' send = np.array(self._local_bound) @@ -486,7 +685,16 @@ def _compute_dual_bound(self): [send, MPI.DOUBLE], [recv, MPI.DOUBLE], op=MPI.SUM) self._local_bound = recv - def _conv_diff(self): + if (self.is_minimizing): + self._fwph_best_bound = np.fmax(self._fwph_best_bound, self._local_bound) + else: + self._fwph_best_bound = np.fmin(self._fwph_best_bound, self._local_bound) + if self._can_update_best_bound(): + self.best_bound_obj_val = self._fwph_best_bound + # if self.cylinder_rank == 0: + # print(f"{self._local_bound=}") + + def fwph_convergence_diff(self): ''' Perform the convergence check of Algorithm 3 in Boland et al. ''' diff = 0. for name in self.local_subproblems.keys(): @@ -735,6 +943,21 @@ def _setup_shared_column_generation(self): self._xhatter = XhatBase(self) self._xhatter.post_iter0() + def _generate_initial_columns_if_needed(self): + if self.spcomm is not None: + if self.spcomm.receive_field_spcomms[Field.BEST_XHAT]: + # we'll get the initial columns from the incumbent finder spoke + return + number_initial_column_tries = self.options.get("FW_initialization_attempts", 10) + if self.FW_options["FW_iter_limit"] == 1 and number_initial_column_tries < 1: + global_toc(f"{self.__class__.__name__}: Warning: FWPH needs an initial shared column if FW_iter_limit == 1. Increasing FW_iter_limit to 2 to ensure convergence", self.cylinder_rank == 0) + self.FW_options["FW_iter_limit"] = 2 + if self.FW_options["FW_iter_limit"] == 1 or number_initial_column_tries > 0: + number_points = self._generate_shared_column(number_initial_column_tries) + if number_points == 0 and self.FW_options["FW_iter_limit"] == 1: + global_toc(f"{self.__class__.__name__}: Warning: FWPH failed to find an initial feasible solution. Increasing FW_iter_limit to 2 to ensure convergence", self.cylinder_rank == 0) + self.FW_options["FW_iter_limit"] = 2 + def _generate_shared_column(self, tries=1): """ Called after iter 0 to satisfy the condition of equation (17) in Boland et al., if t_max / FW_iter_limit == 1 @@ -765,14 +988,7 @@ def _generate_shared_column(self, tries=1): return number_points def _is_timed_out(self): - if (self.cylinder_rank == 0): - time_elapsed = time.time() - self.t0 - status = 1 if (time_elapsed > self.FW_options['time_limit']) \ - else 0 - else: - status = None - status = self.comms['ROOT'].bcast(status, root=0) - return status != 0 + return self.allreduce_or( (time.perf_counter() - self.start_time) >= self.options["time_limit"] ) def _options_checks_fw(self): ''' Name Boland notation (Algorithm 2) diff --git a/mpisppy/phbase.py b/mpisppy/phbase.py index 79483d1fc..bdc8b031d 100644 --- a/mpisppy/phbase.py +++ b/mpisppy/phbase.py @@ -1017,6 +1017,7 @@ def iterk_loop(self): # print a screen trace for iteration 0 if self.spcomm.is_converged(): global_toc("Cylinder convergence", self.cylinder_rank == 0) + return for self._PHIter in range(1, max_iterations+1): iteration_start_time = time.time() diff --git a/mpisppy/utils/cfg_vanilla.py b/mpisppy/utils/cfg_vanilla.py index 76894dd14..42fbb7347 100644 --- a/mpisppy/utils/cfg_vanilla.py +++ b/mpisppy/utils/cfg_vanilla.py @@ -517,6 +517,7 @@ def _fwph_options(cfg): "FW_verbose": cfg.verbose, "mip_solver_options": mip_solver_options, "qp_solver_options": qp_solver_options, + "FW_LP_start_iterations": cfg.fwph_lp_start_iterations, } return fw_options diff --git a/mpisppy/utils/config.py b/mpisppy/utils/config.py index f9e56d893..d889658e1 100644 --- a/mpisppy/utils/config.py +++ b/mpisppy/utils/config.py @@ -609,6 +609,11 @@ def fwph_args(self): domain=float, default=1e-4) + self.add_to_config("fwph_lp_start_iterations", + description="Number of iterations to operate on LP relaxation to warmstart fwph duals", + domain=int, + default=0) + def lagrangian_args(self):