"""Robust GPE minimizer"""
from collections import namedtuple
from packaging.version import Version
import warnings
import numpy as np
import scipy.optimize.nonlin
import scipy as sp
from mmfutils.solve import broyden
[docs]
_EPS = np.finfo(float).eps
[docs]
_TINY = np.finfo(float).tiny
[docs]
class Minimize:
"""General minimization class."""
def __init__(self, x=None, x_scale=1.0, f_scale=1.0, plot=False):
"""
x : real array
Initial state.
To Do
-----
- Check memory usage and avoid unecessary copies.
"""
if x is not None or not hasattr(self, "x"):
self.x = x
self.init()
[docs]
def init(self):
"""Overload this to initialize your state if needed."""
pass
[docs]
def f_df(self, x):
"""Return the objective function and it's derivative as a function of
the unpacked state `x`."""
raise NotImplementedError
# return f, df
[docs]
def check(self, x=None):
"""Check that the state.get_energy() and state.get_Hy() are correct
(i.e. that Hy is derivative of E()."""
def _f(x):
return self.f_df(x)[0]
def _df(x):
return self.f_df(x)[1]
if x is None:
x = self.x
x0 = x / self.x_scale
return self.check_derivative(f=_f, df=_df, x=x0)
[docs]
def minimize(
self,
plot=False,
callback=None,
method="L-BFGS-B",
polish=False,
broyden_alpha=None,
broyden_opts=None,
f_tol=1e-12,
x_tol=1e-7,
use_scipy=True,
ignore_f=False,
_test=False,
_debug=False,
_log=False,
use_cache=True,
bounds=None,
**kw,
):
"""Return the state with minimized energy.
If `x_tol < sqrt(eps)`, then we use the canned L-BFGS-B algorithm from
scipy as the is very robust. If we need higher precision on `x`, then
we use a custom L-BFGS algorithm with a modified stopping criterion.
Arguments
---------
f_tol : float
Relative stopping tolerance for the energy.
x_tol : float
Relative stopping tolerance for the wavefunction.
polish : bool
If `True`, then build an approximation to the hessian during the
minimization phase so that this can be used with Broyden's method to
polish the solution to high accuracy after the energy is minimized.
This allows us to ensure that the wavefunction is also converged to
high accuracy. (Since the energy is quadratic about the minimum, it
is common for the energy to be converged to machine precision eps
even though the state is only converged to about sqrt(eps)).
Currently does not actually polish the solution - only stores the data in
`self.broyden_data`. See `use_scipy` below.
broyden_alpha : None, float
Initial inverse Jacobian approximation is `alpha` times the
identity. If `None`, then this will be estimated by performing a
line search in the descent direction.
broyden_opts : dict()
Arguments to pass through to `mmfutils.solve.broyden.DyadicSum()`.
use_scipy : bool
If `True`, then use the scipy solvers, otherwise, if `x_tol < sqrt(eps)`,
then use our custom solver that performs Broyden iterations after. The issue
here is that the minimizer might bail out prematurely because of the `f_tol`
criterion before reaching the desired `x_tol` because the minimum is
quadratic. Since we have information about the hessian, we can in principle
continue to refine the solution with Broyden iterations to minimize the norm
of the residual.
This is the idea of the polish option, but one can "trick" scipy into doing
this by setting `f_tol=np.nan`. This will always invalidate the criterion
(even `f_tol=0` can fail sometimes when the energy is very flat) and force
scipy to continue until `x_tol` is met or the method fails. Always check
your answer if you do this!
_debug : bool
If `True` then return `(x, f, df, locals())`.
Returns
-------
x : array
Solution
f, df : functions
Objective function and its gradient in the packed representation.
Developer notes:
1. Do not mutate `state0`. Use `state` as a working copy if a
true state is needed.
2. Be careful which version of `state` or `state0` you use as the
argument to `get_Hy` and `get_energy` since this determines the
non-linear portions. In general one should use `state`, but
if one implements a minimization on the norm of `Hy` then for
the derivative one must be careful.
"""
if _log:
self._calls = []
_cache = [0, None, None]
def _f(x):
if (
not use_cache
or _cache[1] is None
or not np.allclose(x, _cache[0], atol=1e-32, rtol=_EPS)
):
_cache[0] = x.copy()
_cache[1:] = self.f_df(x)
if _log:
self._calls.append(tuple(_cache))
return _cache[1]
def _df(x):
if (
not use_cache
or _cache[1] is None
or not np.allclose(x, _cache[0], atol=1e-32, rtol=_EPS)
):
_cache[0] = x.copy()
_cache[1:] = self.f_df(x)
if _log:
self._calls.append(tuple(_cache))
return _cache[2]
x0 = self.x.copy()
if _test: # Check derivative
assert self.check_derivative(f=_f, df=_df, x=x0)
_x = [x0]
options = dict(disp=0)
options.update(kw)
callbacks = []
if polish:
if broyden_opts is None:
broyden_opts = {}
broyden_method = broyden_opts.pop("method", "good")
F0 = _df(x0)
if broyden_alpha is not None:
B = broyden.DyadicSum(alpha=broyden_alpha, **broyden_opts)
J = scipy.optimize.nonlin.BroydenFirst(
alpha=-broyden_alpha, **broyden_opts
)
J.setup(x0, F0, _df)
else:
# alpha needs to be good, so we go a line search in the descent
# direction to determine the best alpha
dx0 = -F0
(
broyden_alpha,
fc,
gc,
new_fval,
old_fval,
new_slop,
) = sp.optimize.line_search(_f, _df, xk=x0, pk=dx0)
print(broyden_alpha)
x1 = x0 + broyden_alpha * dx0
F1 = _df(x1)
B = broyden.DyadicSum(alpha=broyden_alpha, **broyden_opts)
B.update_broyden(dx=x1 - x0, df=F1 - F0, method=broyden_method)
x0, F0 = x1, F1
J = scipy.optimize.nonlin.BroydenFirst(
alpha=-broyden_alpha, **broyden_opts
)
J.setup(x0, F0, _df)
J.update(x1, F1)
broyden_data = namedtuple("BroydenData", ["x0", "F0", "B", "J", "df"])(
x0, F0, B, J, _df
)
def update_B(x, broyden_data=broyden_data):
"""Update the inverse Jacobian B."""
F = _df(x)
broyden_data.J.update(x, F)
dx = x - broyden_data.x0
dF = F - broyden_data.F0
broyden_data.B.update_broyden(dx=dx, df=dF, method=broyden_method)
broyden_data.x0[...] = x
broyden_data.F0[...] = F
callbacks.append(update_B)
if callback is not None:
callbacks.append(callback)
if _test: # Check derivative
def callback_test(x):
self.check_derivative(f=_f, df=_df, x=x)
callbacks.append(callback_test)
callback_ = None
if callbacks:
def callback_(x):
return [c(x) for c in callbacks]
self._alphas = []
if ignore_f:
assert bounds is None
if options["disp"] > 1:
print("ignore_f==True, Using Brodyen to find root of df only.")
res = self._minimize_norm(
df=_df,
x0=_x[0],
callback=callback_,
x_tol=x_tol,
broyden_alpha=broyden_alpha,
**options,
)
elif x_tol < np.sqrt(np.finfo(float).eps) and not use_scipy:
assert bounds is None
if options["disp"] > 1:
print("x_tol < sqrt(eps), so using our custom L_BFGS solver.")
res = self._minimize_L_BFGS(
f=_f, df=_df, x0=_x[0], callback=callback_, x_tol=x_tol, **options
)
else:
if options["disp"] > 1:
print("Using scipy.optimize.mimimize.")
options.setdefault("ftol", f_tol)
options.setdefault("gtol", x_tol)
if Version(sp.__version__) > Version("1.15.0"):
options.pop("disp", None)
res = self._minimize(
f=_f,
df=_df,
x0=_x[0],
method=method,
callback=callback_,
bounds=bounds,
options=options,
)
self.minimize_results = res
if not res.success:
msg = res.message
if not isinstance(msg, str):
msg = msg.decode()
warnings.warn(msg)
_x[0] = res.x
if polish:
self.broyden_data = broyden_data
if _debug:
return res.x, _f, _df, locals()
else:
return res.x
[docs]
def _minimize(self, f, df, x0, method, callback, bounds, options):
"""Interface to the scipy minimizer."""
res = sp.optimize.minimize(
fun=f,
jac=df,
x0=x0,
method=method,
callback=callback,
bounds=bounds,
options=options,
)
res.f = f
res.df = df
return res
[docs]
def _minimize_L_BFGS(
self, f, df, x0, callback, x_tol=1e-12, safe=True, tries=5, disp=0
):
"""Custom L-BFGS minimizer."""
x1 = x0
if callback:
callback(x1)
f0 = None
f1 = f(x1)
df1 = df(x1)
# assert self.check_derivative(f, df, x1)
df1_norm = np.linalg.norm(df1)
B = broyden.JacobianBFGS()
converged = False
message = ""
bad_steps = 0
######### Fails if first step is bad... check this case....
# import pdb;pdb.set_trace()
while not converged:
B.update(x=x1, f=df1)
x0, f0, f00, df0, df0_norm = x1, f1, f0, df1, df1_norm
dx0 = -B.solve(df0)
x_err = np.linalg.norm(dx0) / np.linalg.norm(x0)
if x_err < x_tol:
converged = True
break
elif disp >= 100:
print("x_err={} > x_tol={}".format(x_err, x_tol))
alpha = None
if safe:
# Use safe line search
alpha, fc, gc, f1, f0, new_slop = sp.optimize.line_search(
f, df, xk=x0, pk=dx0, old_fval=f0, c2=0.9, old_old_fval=f00
)
if alpha is None:
# No safe line-search step, switch to nonlin search
if disp > 0:
print("No safe step found.")
safe = False
self._locals = locals()
if alpha is None:
# This is from our original code, but it uses private methods. I am not
# sure if we should be using line_search_wolfe1 instead... Needs
# testing, but we include as a fallback.
_nonlin_line_search = None
try:
from scipy.optimize.nonlin import _nonlin_line_search
except ImportError:
pass
if _nonlin_line_search is None:
try:
from scipy.optimize._nonlin import _nonlin_line_search
except ImportError:
pass
if _nonlin_line_search is not None:
if disp > 0:
print("Resorting to non-linear steps.")
alpha, x1, df1, df1_norm = _nonlin_line_search(
df, x0, df0, dx0, search_type="wolfe"
)
else:
# Could do wolfe1 here maybe? Currently, we just take the full step.
alpha = 1.0
x1 = x0 + alpha * dx0
df1 = df(x1)
df1_norm = np.linalg.norm(df1)
x1 = x0 + alpha * dx0
df1 = df(x1)
df1_norm = np.linalg.norm(df1)
if alpha == 1.0 and df1_norm >= df0_norm:
bad_steps += 1
if disp > 0:
print(
"Bad step {}: |df1| > |df0|: {} > {} ".format(
bad_steps, df1_norm, df0_norm
)
)
if bad_steps > tries:
# Failed to converge
if disp > 0:
print("Too many bad steps: failing")
message = (
"Could not find successful non-linear "
" step after {} bad_steps".format(bad_steps)
)
break
elif disp >= 10:
print("Taking good step")
self._alphas.append(alpha)
# assert self.check_derivative(f, df, x0)
if callback:
callback(x1)
res = namedtuple("MinimizeResults", ["x", "f", "df", "success", "message"])(
x1, f, df, converged, message
)
return res
[docs]
def _minimize_norm(
self,
df,
x0,
callback,
broyden_alpha=None,
x_tol=1e-12,
safe=True,
tries=5,
disp=0,
):
"""Custom Broyden minimizer for df only.
This looks for solutions to df=0 without regard to them being
minima of the energy. This is useful for finding saddle points.
"""
from scipy.optimize.nonlin import _nonlin_line_search
x1 = x0
if callback:
callback(x1)
df1 = df(x1)
df1_norm = np.linalg.norm(df1)
"""
if broyden_alpha is None:
# alpha needs to be good, so we go a line search in the descent
# direction to determine the best alpha
dx0 = -F0
broyden_alpha, x1, df1, df1_norm = _nonlin_line_search(
df, x0, df0, dx0, search_type='wolfe')
broyden_alpha, fc, gc, new_fval, old_fval, new_slop = \
sp.optimize.line_search(_f, _df, xk=x0, pk=dx0)
print(broyden_alpha)
x1 = x0 + broyden_alpha*dx0
F1 = _df(x1)
B = broyden.DyadicSum(alpha=broyden_alpha, **broyden_opts)
B.update_broyden(dx=x1-x0, df=F1-F0, method=broyden_method)
x0, F0 = x1, F1
J = scipy.optimize.nonlin.BroydenFirst(
alpha=-broyden_alpha, **broyden_opts)
J.setup(x0, F0, _df)
J.update(x1, F1)
"""
B = broyden.Jacobian(alpha=broyden_alpha)
converged = False
message = ""
bad_steps = 0
while not converged:
B.update(x=x1, f=df1)
x0, df0, df0_norm = x1, df1, df1_norm
dx0 = -B.solve(df0)
x_err = np.linalg.norm(dx0) / np.linalg.norm(x0)
if x_err < x_tol:
converged = True
break
elif disp >= 100:
print("x_err={} > x_tol={}".format(x_err, x_tol))
alpha, x1, df1, df1_norm = _nonlin_line_search(
df, x0, df0, dx0, search_type="wolfe"
)
if alpha == 1.0 and df1_norm >= df0_norm:
bad_steps += 1
if disp > 0:
print(
"Bad step {}: |df1| > |df0|: {} > {} ".format(
bad_steps, df1_norm, df0_norm
)
)
if bad_steps > tries:
# Failed to converge
if disp > 0:
print("Too many bad steps: failing")
message = (
"Could not find successful non-linear "
" step after {} bad_steps".format(bad_steps)
)
break
else:
if disp >= 10:
print(
"Taking good nonlinear step:"
+ f"alpha={alpha:.2f}, |df|={df1_norm:.2g}"
)
self._alphas.append(alpha)
# assert self.check_derivative(f, df, x0)
if callback:
callback(x1)
res = namedtuple("MinimizeDfResults", ["x", "df", "success", "message"])(
x1, df, converged, message
)
return res
[docs]
def check_derivative(self, f, df, x, rtol=1e-4):
from mmfutils.math.differentiate import differentiate
np.random.seed(11)
dx = np.random.random(x.shape) - 0.1
dx /= np.linalg.norm(dx)
def _f(h):
return f(x + dx * h)
d_f = differentiate(_f, h0=0.2)
_f_x = f(x) # Do this to populate f_df cache # noqa: F841
_df_x = df(x)
_df_x_ = np.dot(_df_x, dx)
res = np.allclose(d_f, _df_x_, rtol=rtol)
if not res:
warnings.warn(f"minimize.check_derivative() failed with {d_f / _df_x_} != 1")
return res
[docs]
class MinimizeState(Minimize):
"""Minimizer for states with the pytimeode.interfaces.IStateForABMEvolver
interface."""
def __init__(self, state, real=False, fix_N=True, **kw):
"""
state : IStateMinimizeGPE
Initial state providing the IStateMinimizeGPE interface.
`state.plot()` : Plot the state (for interactive used and debugging.)
real : bool
If `True`, then the solver will only consider real ansatz,
assuming that the ground state is real, or that any phases are
exactly provided by `P`. Note: if the actual state should also
be real (i.e. if `P` is also real) then this should be
indicated in `state.dtype`.
fix_N : bool
If `True`, then keep `N` fixed while minimizing, otherwise, we expect the
state to include any chemical-potential subtraction in `get_Vext()` so that
it is consistently applied in both `get_Hy()` and `get_energy()` used here.
Note: if any `real` or component of `N = state.normalize()[1]` is zero, then
only elements of the data will be active -- all zero elements will be held
fixed.
"""
[docs]
self.state = state.copy()
[docs]
self._active_inds = slice(None)
super().__init__(**kw)
@property
[docs]
def x(self):
"""Get current state. Must allow setting and unsetting."""
return self.pack(self.state)
@x.setter
def x(self, x):
"""Set current state. Must allow setting and unsetting."""
return self.unpack(x, self.state)
@property
[docs]
def _x_dtype(self):
return float if self.real else complex
[docs]
def init(self):
self.state.pre_minimize_hook()
state = self._state = self.state.copy()
self._active_inds = slice(None)
# All components
self._x_full = state.asnumpy(state.ravel(dtype=float).copy())
if self.fix_N:
# Determine if we need to mask any inputs. This also
# applies for real states.
N = state.normalize()[1]
if self.real or np.any(N == 0):
# Some of the constraints are zero. The corresponding
# components should not be optimized, but fixed to
# zero
const = 1.0 if self.real else 1 + 1j
state.fill(const)
state *= N
self._active_inds = "need to compute"
elif self.real:
const = 1.0 if self.real else 1 + 1j
state.fill(const)
self._active_inds = "need to compute"
if self._active_inds == "need to compute":
self._active_inds = np.where(state.asnumpy(state.ravel(dtype=float) != 0))
[docs]
def unpack(self, x, state=None):
"""Unpack `x` into `state` including factors of `x_scale`
Note: Do not scale or otherwise mutate `x`.
"""
if state is None:
if not hasattr(self, "_state"):
self._state = self.state.copy()
state = self._state
if not hasattr(self, "_x_full"):
self._x_full = state.asnumpy(state.ravel(dtype=float)).copy()
self._x_full[self._active_inds] = x
state.unravel_from(self._x_full)
state *= self.x_scale
return state
[docs]
def pack(self, state):
"""This is not symmetric with `unpack` as it does not scale
the solution by any factors. (This is because we unpack both
states and Hpsi which have different scalings.).
"""
psi = state.asnumpy(state.ravel())
if self.real:
assert np.allclose(psi.imag, 0)
return psi.view(dtype=float).ravel()[self._active_inds]
[docs]
def f_df(self, x):
"""Return the objective function and it's derivative as a function of
the unpacked state `x`."""
psi = self.unpack(x)
if self.fix_N:
s, N = psi.normalize()
Hpsi = psi.get_Hy(subtract_mu=self.fix_N)
if self.fix_N:
Hpsi *= s
E = psi.get_energy()
if hasattr(self, "_Es"):
self._Es.append(E)
if self.fix_N:
# Add extra piece to Hamiltonian to remove singular direction due
# to scaling. Since E(s*psi) is flat in this direction, adding
# this piece does not change the location of the minimum, but it
# does ensure that the solver does not let the norm of psi run
# away.
E += psi.asnumpy(psi.xp.sum(self.f_scale * (1.0 / s**2 - 1) ** 2 / 4))
Hpsi.axpy(psi, a=self.f_scale * (1.0 / s**2 - 1) / s / 2 / (N + _TINY))
Hpsi *= 2.0 * psi.metric # for real x rather than psi
if self.plot:
psi.plot()
f = psi.asnumpy(E) / self.f_scale
df = self.pack(Hpsi)
df *= self.x_scale / self.f_scale
return f, df
[docs]
def g_dg(self, x):
"""Return the objective function minimizing the residual norm.
INCOMPLETE.
"""
psi = self.unpack(x)
if self.fix_N:
s, N = psi.normalize()
Hpsi = psi.get_Hy(subtract_mu=self.fix_N)
if self.fix_N:
Hpsi *= s
E = psi.get_energy()
if hasattr(self, "_Es"):
self._Es.append(E)
if self.fix_N:
# Add extra piece to Hamiltonian to remove singular direction due
# to scaling. Since E(s*psi) is flat in this direction, adding
# this piece does not change the location of the minimum, but it
# does ensure that the solver does not let the norm of psi run
# away.
E += psi.asnumpy(psi.xp.sum(self.f_scale * (1.0 / s**2 - 1) ** 2 / 4))
Hpsi.axpy(psi, a=self.f_scale * (1.0 / s**2 - 1) / s / 2 / N)
Hpsi *= 2.0 * psi.metric # for real x rather than psi
if self.plot:
psi.plot()
f = psi.asnumpy(E) / self.f_scale
df = self.pack(Hpsi)
df *= self.x_scale / self.f_scale
return f, df
[docs]
def minimize(self, psi_tol=1e-8, E_tol=1e-12, callback=None, _debug=False, **kw):
# state0 = self.state.copy()
if callback is not None:
_callback_state = callback
def callback(x):
state = self.unpack(x)
if self.fix_N:
state.normalize()
return _callback_state(state)
kw["callback"] = callback
self._Es = []
kw.update(x_tol=psi_tol, f_tol=E_tol)
if _debug:
return super().minimize(_debug=_debug, **kw)
else:
x = super().minimize(**kw)
state = self.unpack(x)
if self.fix_N:
state.normalize()
state.minimize_results = self.minimize_results
self.state = state
return state
[docs]
class MinimizeStateFixedPhase(MinimizeState):
"""Minimizer with fixed phase."""
def __init__(self, state, phase, fix_N=True, **kw):
"""
phase : array
Array with the phase.
fix_N : bool
If `True`, then keep `N` fixed while minimizing, otherwise, we expect the
state to include any chemical-potential subtraction in `get_Vext()` so that
it is consistently applied in both `get_Hy()` and `get_energy()` used here.
"""
[docs]
self.phase = np.exp(1j * np.angle(phase))
super().__init__(state=state, real=True, fix_N=fix_N, **kw)
[docs]
def init(self):
self.state.pre_minimize_hook()
self._state = self.state.copy()
[docs]
def unpack(self, x, state=None):
"""Unpack `x` into `state` including factors of `x_scale`
Note: Do not scale or otherwise mutate `x`.
"""
if state is None:
if not hasattr(self, "_state"):
self._state = self.state.copy()
state = self._state
state.unravel_from(x + 0j)
state *= self.phase
state *= self.x_scale
return state
[docs]
def pack(self, state):
"""This is not symmetric with `unpack` as it does not scale
the solution by any factors. (This is because we unpack both
states and Hpsi which have different scalings.).
"""
psi = state.asnumpy(state.ravel())
return abs(psi)
[docs]
def f_df(self, x):
"""Return the objective function and it's derivative as a function of
the unpacked state `x`."""
psi = self.unpack(x)
if self.fix_N:
s, N = psi.normalize()
Hpsi = psi.get_Hy(subtract_mu=self.fix_N)
Hpsi *= self.phase.conj()
if self.fix_N:
Hpsi *= s
E = psi.get_energy()
if hasattr(self, "_Es"):
self._Es.append(E)
if self.fix_N:
# Add extra piece to Hamiltonian to remove singular direction due
# to scaling. Since E(s*psi) is flat in this direction, adding
# this piece does not change the location of the minimum, but it
# does ensure that the solver does not let the norm of psi run
# away.
E += psi.asnumpy(psi.xp.sum(self.f_scale * (1.0 / s**2 - 1) ** 2 / 4))
Hpsi.axpy(psi, a=self.f_scale * (1.0 / s**2 - 1) / s / 2 / N)
Hpsi *= 2.0 * psi.metric
if self.plot:
psi.plot()
f = psi.asnumpy(E) / self.f_scale
df = Hpsi.ravel().real
# df = self.pack(Hpsi)
df *= self.x_scale / self.f_scale
return f, np.asfortranarray(df)
[docs]
def minimize(self, **kw):
N = np.prod(self.phase.shape)
bounds = [(0, np.inf)] * N
return super().minimize(use_scipy=True, bounds=bounds, **kw)