Source code for gpe.minimize

"""Robust GPE minimizer"""

from collections import namedtuple
from packaging.version import Version
import warnings

import numpy as np

import scipy.optimize.nonlin
import scipy as sp

from mmfutils.solve import broyden

[docs] _EPS = np.finfo(float).eps
[docs] _TINY = np.finfo(float).tiny
[docs] class Minimize: """General minimization class.""" def __init__(self, x=None, x_scale=1.0, f_scale=1.0, plot=False): """ x : real array Initial state. To Do ----- - Check memory usage and avoid unecessary copies. """ if x is not None or not hasattr(self, "x"): self.x = x
[docs] self.x_scale = x_scale
[docs] self.f_scale = f_scale
[docs] self.plot = plot
self.init()
[docs] def init(self): """Overload this to initialize your state if needed.""" pass
[docs] def f_df(self, x): """Return the objective function and it's derivative as a function of the unpacked state `x`.""" raise NotImplementedError
# return f, df
[docs] def check(self, x=None): """Check that the state.get_energy() and state.get_Hy() are correct (i.e. that Hy is derivative of E().""" def _f(x): return self.f_df(x)[0] def _df(x): return self.f_df(x)[1] if x is None: x = self.x x0 = x / self.x_scale return self.check_derivative(f=_f, df=_df, x=x0)
[docs] def minimize( self, plot=False, callback=None, method="L-BFGS-B", polish=False, broyden_alpha=None, broyden_opts=None, f_tol=1e-12, x_tol=1e-7, use_scipy=True, ignore_f=False, _test=False, _debug=False, _log=False, use_cache=True, bounds=None, **kw, ): """Return the state with minimized energy. If `x_tol < sqrt(eps)`, then we use the canned L-BFGS-B algorithm from scipy as the is very robust. If we need higher precision on `x`, then we use a custom L-BFGS algorithm with a modified stopping criterion. Arguments --------- f_tol : float Relative stopping tolerance for the energy. x_tol : float Relative stopping tolerance for the wavefunction. polish : bool If `True`, then build an approximation to the hessian during the minimization phase so that this can be used with Broyden's method to polish the solution to high accuracy after the energy is minimized. This allows us to ensure that the wavefunction is also converged to high accuracy. (Since the energy is quadratic about the minimum, it is common for the energy to be converged to machine precision eps even though the state is only converged to about sqrt(eps)). Currently does not actually polish the solution - only stores the data in `self.broyden_data`. See `use_scipy` below. broyden_alpha : None, float Initial inverse Jacobian approximation is `alpha` times the identity. If `None`, then this will be estimated by performing a line search in the descent direction. broyden_opts : dict() Arguments to pass through to `mmfutils.solve.broyden.DyadicSum()`. use_scipy : bool If `True`, then use the scipy solvers, otherwise, if `x_tol < sqrt(eps)`, then use our custom solver that performs Broyden iterations after. The issue here is that the minimizer might bail out prematurely because of the `f_tol` criterion before reaching the desired `x_tol` because the minimum is quadratic. Since we have information about the hessian, we can in principle continue to refine the solution with Broyden iterations to minimize the norm of the residual. This is the idea of the polish option, but one can "trick" scipy into doing this by setting `f_tol=np.nan`. This will always invalidate the criterion (even `f_tol=0` can fail sometimes when the energy is very flat) and force scipy to continue until `x_tol` is met or the method fails. Always check your answer if you do this! _debug : bool If `True` then return `(x, f, df, locals())`. Returns ------- x : array Solution f, df : functions Objective function and its gradient in the packed representation. Developer notes: 1. Do not mutate `state0`. Use `state` as a working copy if a true state is needed. 2. Be careful which version of `state` or `state0` you use as the argument to `get_Hy` and `get_energy` since this determines the non-linear portions. In general one should use `state`, but if one implements a minimization on the norm of `Hy` then for the derivative one must be careful. """ if _log: self._calls = [] _cache = [0, None, None] def _f(x): if ( not use_cache or _cache[1] is None or not np.allclose(x, _cache[0], atol=1e-32, rtol=_EPS) ): _cache[0] = x.copy() _cache[1:] = self.f_df(x) if _log: self._calls.append(tuple(_cache)) return _cache[1] def _df(x): if ( not use_cache or _cache[1] is None or not np.allclose(x, _cache[0], atol=1e-32, rtol=_EPS) ): _cache[0] = x.copy() _cache[1:] = self.f_df(x) if _log: self._calls.append(tuple(_cache)) return _cache[2] x0 = self.x.copy() if _test: # Check derivative assert self.check_derivative(f=_f, df=_df, x=x0) _x = [x0] options = dict(disp=0) options.update(kw) callbacks = [] if polish: if broyden_opts is None: broyden_opts = {} broyden_method = broyden_opts.pop("method", "good") F0 = _df(x0) if broyden_alpha is not None: B = broyden.DyadicSum(alpha=broyden_alpha, **broyden_opts) J = scipy.optimize.nonlin.BroydenFirst( alpha=-broyden_alpha, **broyden_opts ) J.setup(x0, F0, _df) else: # alpha needs to be good, so we go a line search in the descent # direction to determine the best alpha dx0 = -F0 ( broyden_alpha, fc, gc, new_fval, old_fval, new_slop, ) = sp.optimize.line_search(_f, _df, xk=x0, pk=dx0) print(broyden_alpha) x1 = x0 + broyden_alpha * dx0 F1 = _df(x1) B = broyden.DyadicSum(alpha=broyden_alpha, **broyden_opts) B.update_broyden(dx=x1 - x0, df=F1 - F0, method=broyden_method) x0, F0 = x1, F1 J = scipy.optimize.nonlin.BroydenFirst( alpha=-broyden_alpha, **broyden_opts ) J.setup(x0, F0, _df) J.update(x1, F1) broyden_data = namedtuple("BroydenData", ["x0", "F0", "B", "J", "df"])( x0, F0, B, J, _df ) def update_B(x, broyden_data=broyden_data): """Update the inverse Jacobian B.""" F = _df(x) broyden_data.J.update(x, F) dx = x - broyden_data.x0 dF = F - broyden_data.F0 broyden_data.B.update_broyden(dx=dx, df=dF, method=broyden_method) broyden_data.x0[...] = x broyden_data.F0[...] = F callbacks.append(update_B) if callback is not None: callbacks.append(callback) if _test: # Check derivative def callback_test(x): self.check_derivative(f=_f, df=_df, x=x) callbacks.append(callback_test) callback_ = None if callbacks: def callback_(x): return [c(x) for c in callbacks] self._alphas = [] if ignore_f: assert bounds is None if options["disp"] > 1: print("ignore_f==True, Using Brodyen to find root of df only.") res = self._minimize_norm( df=_df, x0=_x[0], callback=callback_, x_tol=x_tol, broyden_alpha=broyden_alpha, **options, ) elif x_tol < np.sqrt(np.finfo(float).eps) and not use_scipy: assert bounds is None if options["disp"] > 1: print("x_tol < sqrt(eps), so using our custom L_BFGS solver.") res = self._minimize_L_BFGS( f=_f, df=_df, x0=_x[0], callback=callback_, x_tol=x_tol, **options ) else: if options["disp"] > 1: print("Using scipy.optimize.mimimize.") options.setdefault("ftol", f_tol) options.setdefault("gtol", x_tol) if Version(sp.__version__) > Version("1.15.0"): options.pop("disp", None) res = self._minimize( f=_f, df=_df, x0=_x[0], method=method, callback=callback_, bounds=bounds, options=options, ) self.minimize_results = res if not res.success: msg = res.message if not isinstance(msg, str): msg = msg.decode() warnings.warn(msg) _x[0] = res.x if polish: self.broyden_data = broyden_data if _debug: return res.x, _f, _df, locals() else: return res.x
[docs] def _minimize(self, f, df, x0, method, callback, bounds, options): """Interface to the scipy minimizer.""" res = sp.optimize.minimize( fun=f, jac=df, x0=x0, method=method, callback=callback, bounds=bounds, options=options, ) res.f = f res.df = df return res
[docs] def _minimize_L_BFGS( self, f, df, x0, callback, x_tol=1e-12, safe=True, tries=5, disp=0 ): """Custom L-BFGS minimizer.""" x1 = x0 if callback: callback(x1) f0 = None f1 = f(x1) df1 = df(x1) # assert self.check_derivative(f, df, x1) df1_norm = np.linalg.norm(df1) B = broyden.JacobianBFGS() converged = False message = "" bad_steps = 0 ######### Fails if first step is bad... check this case.... # import pdb;pdb.set_trace() while not converged: B.update(x=x1, f=df1) x0, f0, f00, df0, df0_norm = x1, f1, f0, df1, df1_norm dx0 = -B.solve(df0) x_err = np.linalg.norm(dx0) / np.linalg.norm(x0) if x_err < x_tol: converged = True break elif disp >= 100: print("x_err={} > x_tol={}".format(x_err, x_tol)) alpha = None if safe: # Use safe line search alpha, fc, gc, f1, f0, new_slop = sp.optimize.line_search( f, df, xk=x0, pk=dx0, old_fval=f0, c2=0.9, old_old_fval=f00 ) if alpha is None: # No safe line-search step, switch to nonlin search if disp > 0: print("No safe step found.") safe = False self._locals = locals() if alpha is None: # This is from our original code, but it uses private methods. I am not # sure if we should be using line_search_wolfe1 instead... Needs # testing, but we include as a fallback. _nonlin_line_search = None try: from scipy.optimize.nonlin import _nonlin_line_search except ImportError: pass if _nonlin_line_search is None: try: from scipy.optimize._nonlin import _nonlin_line_search except ImportError: pass if _nonlin_line_search is not None: if disp > 0: print("Resorting to non-linear steps.") alpha, x1, df1, df1_norm = _nonlin_line_search( df, x0, df0, dx0, search_type="wolfe" ) else: # Could do wolfe1 here maybe? Currently, we just take the full step. alpha = 1.0 x1 = x0 + alpha * dx0 df1 = df(x1) df1_norm = np.linalg.norm(df1) x1 = x0 + alpha * dx0 df1 = df(x1) df1_norm = np.linalg.norm(df1) if alpha == 1.0 and df1_norm >= df0_norm: bad_steps += 1 if disp > 0: print( "Bad step {}: |df1| > |df0|: {} > {} ".format( bad_steps, df1_norm, df0_norm ) ) if bad_steps > tries: # Failed to converge if disp > 0: print("Too many bad steps: failing") message = ( "Could not find successful non-linear " " step after {} bad_steps".format(bad_steps) ) break elif disp >= 10: print("Taking good step") self._alphas.append(alpha) # assert self.check_derivative(f, df, x0) if callback: callback(x1) res = namedtuple("MinimizeResults", ["x", "f", "df", "success", "message"])( x1, f, df, converged, message ) return res
[docs] def _minimize_norm( self, df, x0, callback, broyden_alpha=None, x_tol=1e-12, safe=True, tries=5, disp=0, ): """Custom Broyden minimizer for df only. This looks for solutions to df=0 without regard to them being minima of the energy. This is useful for finding saddle points. """ from scipy.optimize.nonlin import _nonlin_line_search x1 = x0 if callback: callback(x1) df1 = df(x1) df1_norm = np.linalg.norm(df1) """ if broyden_alpha is None: # alpha needs to be good, so we go a line search in the descent # direction to determine the best alpha dx0 = -F0 broyden_alpha, x1, df1, df1_norm = _nonlin_line_search( df, x0, df0, dx0, search_type='wolfe') broyden_alpha, fc, gc, new_fval, old_fval, new_slop = \ sp.optimize.line_search(_f, _df, xk=x0, pk=dx0) print(broyden_alpha) x1 = x0 + broyden_alpha*dx0 F1 = _df(x1) B = broyden.DyadicSum(alpha=broyden_alpha, **broyden_opts) B.update_broyden(dx=x1-x0, df=F1-F0, method=broyden_method) x0, F0 = x1, F1 J = scipy.optimize.nonlin.BroydenFirst( alpha=-broyden_alpha, **broyden_opts) J.setup(x0, F0, _df) J.update(x1, F1) """ B = broyden.Jacobian(alpha=broyden_alpha) converged = False message = "" bad_steps = 0 while not converged: B.update(x=x1, f=df1) x0, df0, df0_norm = x1, df1, df1_norm dx0 = -B.solve(df0) x_err = np.linalg.norm(dx0) / np.linalg.norm(x0) if x_err < x_tol: converged = True break elif disp >= 100: print("x_err={} > x_tol={}".format(x_err, x_tol)) alpha, x1, df1, df1_norm = _nonlin_line_search( df, x0, df0, dx0, search_type="wolfe" ) if alpha == 1.0 and df1_norm >= df0_norm: bad_steps += 1 if disp > 0: print( "Bad step {}: |df1| > |df0|: {} > {} ".format( bad_steps, df1_norm, df0_norm ) ) if bad_steps > tries: # Failed to converge if disp > 0: print("Too many bad steps: failing") message = ( "Could not find successful non-linear " " step after {} bad_steps".format(bad_steps) ) break else: if disp >= 10: print( "Taking good nonlinear step:" + f"alpha={alpha:.2f}, |df|={df1_norm:.2g}" ) self._alphas.append(alpha) # assert self.check_derivative(f, df, x0) if callback: callback(x1) res = namedtuple("MinimizeDfResults", ["x", "df", "success", "message"])( x1, df, converged, message ) return res
[docs] def check_derivative(self, f, df, x, rtol=1e-4): from mmfutils.math.differentiate import differentiate np.random.seed(11) dx = np.random.random(x.shape) - 0.1 dx /= np.linalg.norm(dx) def _f(h): return f(x + dx * h) d_f = differentiate(_f, h0=0.2) _f_x = f(x) # Do this to populate f_df cache # noqa: F841 _df_x = df(x) _df_x_ = np.dot(_df_x, dx) res = np.allclose(d_f, _df_x_, rtol=rtol) if not res: warnings.warn(f"minimize.check_derivative() failed with {d_f / _df_x_} != 1") return res
[docs] class MinimizeState(Minimize): """Minimizer for states with the pytimeode.interfaces.IStateForABMEvolver interface.""" def __init__(self, state, real=False, fix_N=True, **kw): """ state : IStateMinimizeGPE Initial state providing the IStateMinimizeGPE interface. `state.plot()` : Plot the state (for interactive used and debugging.) real : bool If `True`, then the solver will only consider real ansatz, assuming that the ground state is real, or that any phases are exactly provided by `P`. Note: if the actual state should also be real (i.e. if `P` is also real) then this should be indicated in `state.dtype`. fix_N : bool If `True`, then keep `N` fixed while minimizing, otherwise, we expect the state to include any chemical-potential subtraction in `get_Vext()` so that it is consistently applied in both `get_Hy()` and `get_energy()` used here. Note: if any `real` or component of `N = state.normalize()[1]` is zero, then only elements of the data will be active -- all zero elements will be held fixed. """
[docs] self.state = state.copy()
[docs] self.fix_N = fix_N
[docs] self.real = real
[docs] self._active_inds = slice(None)
super().__init__(**kw) @property
[docs] def x(self): """Get current state. Must allow setting and unsetting.""" return self.pack(self.state)
@x.setter def x(self, x): """Set current state. Must allow setting and unsetting.""" return self.unpack(x, self.state) @property
[docs] def _x_dtype(self): return float if self.real else complex
[docs] def init(self): self.state.pre_minimize_hook() state = self._state = self.state.copy() self._active_inds = slice(None) # All components self._x_full = state.asnumpy(state.ravel(dtype=float).copy()) if self.fix_N: # Determine if we need to mask any inputs. This also # applies for real states. N = state.normalize()[1] if self.real or np.any(N == 0): # Some of the constraints are zero. The corresponding # components should not be optimized, but fixed to # zero const = 1.0 if self.real else 1 + 1j state.fill(const) state *= N self._active_inds = "need to compute" elif self.real: const = 1.0 if self.real else 1 + 1j state.fill(const) self._active_inds = "need to compute" if self._active_inds == "need to compute": self._active_inds = np.where(state.asnumpy(state.ravel(dtype=float) != 0))
[docs] def unpack(self, x, state=None): """Unpack `x` into `state` including factors of `x_scale` Note: Do not scale or otherwise mutate `x`. """ if state is None: if not hasattr(self, "_state"): self._state = self.state.copy() state = self._state if not hasattr(self, "_x_full"): self._x_full = state.asnumpy(state.ravel(dtype=float)).copy() self._x_full[self._active_inds] = x state.unravel_from(self._x_full) state *= self.x_scale return state
[docs] def pack(self, state): """This is not symmetric with `unpack` as it does not scale the solution by any factors. (This is because we unpack both states and Hpsi which have different scalings.). """ psi = state.asnumpy(state.ravel()) if self.real: assert np.allclose(psi.imag, 0) return psi.view(dtype=float).ravel()[self._active_inds]
[docs] def f_df(self, x): """Return the objective function and it's derivative as a function of the unpacked state `x`.""" psi = self.unpack(x) if self.fix_N: s, N = psi.normalize() Hpsi = psi.get_Hy(subtract_mu=self.fix_N) if self.fix_N: Hpsi *= s E = psi.get_energy() if hasattr(self, "_Es"): self._Es.append(E) if self.fix_N: # Add extra piece to Hamiltonian to remove singular direction due # to scaling. Since E(s*psi) is flat in this direction, adding # this piece does not change the location of the minimum, but it # does ensure that the solver does not let the norm of psi run # away. E += psi.asnumpy(psi.xp.sum(self.f_scale * (1.0 / s**2 - 1) ** 2 / 4)) Hpsi.axpy(psi, a=self.f_scale * (1.0 / s**2 - 1) / s / 2 / (N + _TINY)) Hpsi *= 2.0 * psi.metric # for real x rather than psi if self.plot: psi.plot() f = psi.asnumpy(E) / self.f_scale df = self.pack(Hpsi) df *= self.x_scale / self.f_scale return f, df
[docs] def g_dg(self, x): """Return the objective function minimizing the residual norm. INCOMPLETE. """ psi = self.unpack(x) if self.fix_N: s, N = psi.normalize() Hpsi = psi.get_Hy(subtract_mu=self.fix_N) if self.fix_N: Hpsi *= s E = psi.get_energy() if hasattr(self, "_Es"): self._Es.append(E) if self.fix_N: # Add extra piece to Hamiltonian to remove singular direction due # to scaling. Since E(s*psi) is flat in this direction, adding # this piece does not change the location of the minimum, but it # does ensure that the solver does not let the norm of psi run # away. E += psi.asnumpy(psi.xp.sum(self.f_scale * (1.0 / s**2 - 1) ** 2 / 4)) Hpsi.axpy(psi, a=self.f_scale * (1.0 / s**2 - 1) / s / 2 / N) Hpsi *= 2.0 * psi.metric # for real x rather than psi if self.plot: psi.plot() f = psi.asnumpy(E) / self.f_scale df = self.pack(Hpsi) df *= self.x_scale / self.f_scale return f, df
[docs] def minimize(self, psi_tol=1e-8, E_tol=1e-12, callback=None, _debug=False, **kw): # state0 = self.state.copy() if callback is not None: _callback_state = callback def callback(x): state = self.unpack(x) if self.fix_N: state.normalize() return _callback_state(state) kw["callback"] = callback self._Es = [] kw.update(x_tol=psi_tol, f_tol=E_tol) if _debug: return super().minimize(_debug=_debug, **kw) else: x = super().minimize(**kw) state = self.unpack(x) if self.fix_N: state.normalize() state.minimize_results = self.minimize_results self.state = state return state
[docs] class MinimizeStateFixedPhase(MinimizeState): """Minimizer with fixed phase.""" def __init__(self, state, phase, fix_N=True, **kw): """ phase : array Array with the phase. fix_N : bool If `True`, then keep `N` fixed while minimizing, otherwise, we expect the state to include any chemical-potential subtraction in `get_Vext()` so that it is consistently applied in both `get_Hy()` and `get_energy()` used here. """
[docs] self.phase = np.exp(1j * np.angle(phase))
super().__init__(state=state, real=True, fix_N=fix_N, **kw)
[docs] def init(self): self.state.pre_minimize_hook() self._state = self.state.copy()
[docs] def unpack(self, x, state=None): """Unpack `x` into `state` including factors of `x_scale` Note: Do not scale or otherwise mutate `x`. """ if state is None: if not hasattr(self, "_state"): self._state = self.state.copy() state = self._state state.unravel_from(x + 0j) state *= self.phase state *= self.x_scale return state
[docs] def pack(self, state): """This is not symmetric with `unpack` as it does not scale the solution by any factors. (This is because we unpack both states and Hpsi which have different scalings.). """ psi = state.asnumpy(state.ravel()) return abs(psi)
[docs] def f_df(self, x): """Return the objective function and it's derivative as a function of the unpacked state `x`.""" psi = self.unpack(x) if self.fix_N: s, N = psi.normalize() Hpsi = psi.get_Hy(subtract_mu=self.fix_N) Hpsi *= self.phase.conj() if self.fix_N: Hpsi *= s E = psi.get_energy() if hasattr(self, "_Es"): self._Es.append(E) if self.fix_N: # Add extra piece to Hamiltonian to remove singular direction due # to scaling. Since E(s*psi) is flat in this direction, adding # this piece does not change the location of the minimum, but it # does ensure that the solver does not let the norm of psi run # away. E += psi.asnumpy(psi.xp.sum(self.f_scale * (1.0 / s**2 - 1) ** 2 / 4)) Hpsi.axpy(psi, a=self.f_scale * (1.0 / s**2 - 1) / s / 2 / N) Hpsi *= 2.0 * psi.metric if self.plot: psi.plot() f = psi.asnumpy(E) / self.f_scale df = Hpsi.ravel().real # df = self.pack(Hpsi) df *= self.x_scale / self.f_scale return f, np.asfortranarray(df)
[docs] def minimize(self, **kw): N = np.prod(self.phase.shape) bounds = [(0, np.inf)] * N return super().minimize(use_scipy=True, bounds=bounds, **kw)