from abc import ABC, abstractmethod
from pathlib import Path
import pickle
import warnings
import numpy as np
from scipy import linalg
from scipy.sparse import coo_matrix
from scipy.sparse.linalg import spsolve
from tqdm import tqdm
from ggce.logger import logger, disable_logger
from ggce.utils.utils import float_to_list, chunk_jobs
from ggce.engine.system import System
from ggce.utils.physics import G0_k_omega
BYTES_TO_MB = 1048576
[docs]class Solver(ABC):
@property
def system(self):
return self._system
@property
def root(self):
return self._root
@property
def basis(self):
return self._basis
@property
def mpi_comm(self):
return self._mpi_comm
@property
def mpi_rank(self):
if self._mpi_comm is not None:
return self._mpi_comm.Get_rank()
return 0
@property
def mpi_world_size(self):
if self._mpi_comm is not None:
return self._mpi_comm.Get_size()
return 1
def __init__(self, system=None, root=None, basis=None, mpi_comm=None):
self._system = system
self._root = root
self._mpi_comm = mpi_comm
self._basis = basis
if self._system is None and self._root is None:
logger.critical("Either system, root or both must be provided")
if self._root is not None:
# We allow checkpointing
self._root = Path(self._root)
self._results_directory = self._root / Path("results")
self._results_directory.mkdir(exist_ok=True, parents=True)
else:
logger.warning(
"root not provided to Solver - Solver checkpointing disabled"
)
self._results_directory = None
if self._system is None:
# Attempt to load the system from its checkpoint... the system
# will now be initialized or an error will be thrown
self._system = System.from_checkpoint(self._root)
# Force checkpoint the system, which at this point must be initialized
with disable_logger():
self._system.checkpoint()
if self._root is not None:
logger.info(f"System checkpointed to '/{self._root}'")
[docs] def get_jobs_on_this_rank(self, jobs):
"""Gets the jobs assigned to this rank. Note this method silently
behaves as it should when the world size is 1 (or there's no MPI
communicator).
Parameters
----------
jobs : list
The jobs to chunk.
Returns
-------
list
The jobs assigned to this rank.
"""
if self.mpi_comm is None:
return jobs
return chunk_jobs(jobs, self.mpi_world_size, self.mpi_rank)
@staticmethod
def _k_omega_eta_to_str(k, omega, eta):
# Note this will have to be redone when k is a vector in 2 and 3D!
return f"{k:.10f}_{omega:.10f}_{eta:.10f}"
@abstractmethod
def _pre_solve(self):
...
@abstractmethod
def _post_solve(self):
...
[docs] @abstractmethod
def solve(self, k, w, eta):
"""Takes, ``k, w, eta`` and returns the Green's function."""
...
[docs] @abstractmethod
def greens_function(self, k, w, eta, pbar=False):
...
[docs]class BasicSolver(Solver):
def _pre_solve(self, k, w, eta):
result = None
path = None
if self._results_directory is not None:
ckpt_path = f"{self._k_omega_eta_to_str(k, w, eta)}.pkl"
path = self._results_directory / Path(ckpt_path)
if path.exists():
result = np.array(pickle.load(open(path, "rb")))
return result, path
def _post_solve(self, G, k, w, path):
if -G.imag / np.pi < 0.0:
logger.error(f"A(k,w) < 0 at k, w = ({k:.02f}, {w:.02f}")
if self._results_directory is not None:
pickle.dump(G, open(path, "wb"), protocol=pickle.HIGHEST_PROTOCOL)
[docs] def greens_function(self, k, w, eta, pbar=False):
"""Solves for the greens_function in serial or in parallel, depending
on whether MPI_COMM is provided to the Solver at instantiation.
Parameters
----------
k : float
The momentum quantum number point of the calculation.
w : float
The frequency grid point of the calculation.
eta : float
The artificial broadening parameter of the calculation.
Returns
-------
numpy.ndarray
The resulting greens_function of shape ``nk`` by ``nw``.
"""
k = float_to_list(k)
w = float_to_list(w)
# This orders the jobs such that when constructed into an array, the
# k-points are the rows and the w-points are the columns after reshape
jobs = [(_k, _w) for _k in k for _w in w]
jobs_on_rank = self.get_jobs_on_this_rank(jobs)
s = []
for _k, _w in tqdm(jobs_on_rank, disable=not pbar):
s.append(self.solve(_k, _w, eta))
if self.mpi_comm is not None:
all_results = self.mpi_comm.gather(s, root=0)
# Only rank 0 returns the result
if self.mpi_rank == 0:
# all_results is a list of arrays of different length
# need to parse it properly into an array
all_results = [
xx[ii] for xx in all_results for ii in range(len(xx))
]
s = np.array([xx for xx in all_results])
return s.reshape(len(k), len(w))
else:
return None
return np.array(s).reshape(len(k), len(w))
[docs]class SparseSolver(BasicSolver):
"""A sparse, serial solver for the Green's function. Useful for when the
calculation being performed is quite cheap. Note that there are a variety
of checkpointing features automatically executed when paths are provided.
- When a ``System`` and path are provided, the system's root directory
is overwritten and that object immediately checkpointed to the provided
directory.
- If only a ``System`` object is provided, no checkpointing will be
performed.
- If only a path is provided, the solver will attempt to instantiate the
``System`` object. Checkpointing will the proceed as normal.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self._basis is None:
self._basis = self._system.get_basis(full_basis=True)
def _sparse_matrix_from_equations(self, k, w, eta):
"""This function iterates through the GGCE equations dicts to extract
the row, column coordiante and value of the nonzero entries in the
matrix. This is subsequently used to construct the parallel sparse
system matrix. This is exactly the same as in the Serial class: however
that method returns X, v whereas here we need row_ind/col_ind_dat.
Parameters
----------
k : float
The momentum quantum number point of the calculation.
w : float
The frequency grid point of the calculation.
eta : float
The artificial broadening parameter of the calculation.
Returns
-------
list, list, list
The row and column coordinate lists, as well as a list of values of
the matrix that are nonzero.
"""
row_ind = []
col_ind = []
dat = []
total_bosons = np.sum(self._system.model.phonon_number)
for n_bosons in range(total_bosons + 1):
for eq in self._system.equations[n_bosons]:
row_dict = dict()
index_term_id = eq.index_term.id()
ii_basis = self._basis[index_term_id]
for term in eq._terms_list + [eq.index_term]:
jj = self._basis[term.id()]
try:
row_dict[jj] += term.coefficient(k, w, eta)
except KeyError:
row_dict[jj] = term.coefficient(k, w, eta)
row_ind.extend([ii_basis for _ in range(len(row_dict))])
col_ind.extend([key for key, _ in row_dict.items()])
dat.extend([value for _, value in row_dict.items()])
# estimate sparse matrix memory usage
# (complex (16 bytes) + int (4 bytes) + int) * nonzero entries
est_mem_used = 24 * len(dat) / BYTES_TO_MB
logger.debug(f"Estimated memory needed is {est_mem_used:.02f} MB")
return row_ind, col_ind, dat
def _scaffold(self, k, w, eta):
"""Prepare the X, v sparse representation of the matrix to solve.
Parameters
----------
k : float
The momentum quantum number point of the calculation.
w : float
The frequency grid point of the calculation.
eta : float, optional
The artificial broadening parameter of the calculation.
Returns
-------
csr_matrix, csr_matrix
Sparse representation of the matrix equation to solve, X and v.
"""
row_ind, col_ind, dat = self._sparse_matrix_from_equations(k, w, eta)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=DeprecationWarning)
proto_matr = (
np.array(dat, dtype=np.complex64),
(np.array(row_ind), np.array(col_ind)),
)
X = coo_matrix(proto_matr).tocsr()
size = (X.data.size + X.indptr.size + X.indices.size) / BYTES_TO_MB
logger.debug(f"Memory usage of sparse X is {size:.01f} MB")
# Initialize the corresponding sparse vector
# {G}(0)
row_ind = np.array([self._basis["{G}[0.0]"]])
col_ind = np.array([0])
v = coo_matrix(
(
np.array(
[self._system.equations[0][0].bias(k, w, eta)],
dtype=np.complex64,
),
(row_ind, col_ind),
)
).tocsr()
return X, v
[docs] def solve(self, k, w, eta):
"""Solve the sparse-represented system for some given point
:math:`(k, \\omega, \\eta)`. Note that if the spectral function is
negative results will not be checkpointed.
Parameters
----------
k : float
The momentum quantum number point of the calculation.
w : float
The frequency grid point of the calculation.
eta : float
The artificial broadening parameter of the calculation.
Returns
-------
numpy.ndarray
The value of :math:`G(k, \\omega; \\eta)`.
"""
result, path = self._pre_solve(k, w, eta)
if result is not None:
return result
# Solution ------------------------------------------------------------
X, v = self._scaffold(k, w, eta)
res = spsolve(X, v)
G = res[self._basis["{G}[0.0]"]]
# Solution Done -------------------------------------------------------
self._post_solve(G, k, w, path)
return np.array(G)
[docs]class DenseSolver(BasicSolver):
"""A sparse, dense solver for the Green's function. Note that there are a
variety of checkpointing features automatically executed when paths are
provided. Uses the SciPy dense solver engine to solve for G(k, w) in
serial. This method uses the continued fraction approach,
.. math:: R_{n-1} = (1 - \\beta_{n-1}R_{n})^{-1} \\alpha_{n-1}
with
.. math:: R_n = \\alpha_n
and
.. math:: f_n = R_n f_{n-1}
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self._basis is None:
self._basis = self._system.get_basis(full_basis=False)
def _fill_matrix(self, k, w, n_phonons, shift, eta):
n_phonons_shift = n_phonons + shift
equations_n = self._system.equations[n_phonons]
# Initialize a matrix to fill
d1 = len(self._basis[n_phonons])
d2 = len(self._basis[n_phonons + shift])
A = np.zeros((d1, d2), dtype=np.complex64)
# Fill the matrix of coefficients
for ii, eq in enumerate(equations_n):
index_term_id = eq.index_term.id()
ii_basis = self._basis[n_phonons][index_term_id]
for term in eq._terms_list:
if term.config.total_phonons != n_phonons_shift:
continue
t_id = term.id()
jj_basis = self._basis[n_phonons_shift][t_id]
A[ii_basis, jj_basis] += term.coefficient(k, w, eta)
return A
def _get_alpha(self, k, w, n_phonons, eta):
return self._fill_matrix(k, w, n_phonons, -1, eta)
def _get_beta(self, k, w, n_phonons, eta):
return self._fill_matrix(k, w, n_phonons, 1, eta)
[docs] def solve(self, k, w, eta):
"""Solve the dense-represented system.
Parameters
----------
k : float
The momentum quantum number point of the calculation.
w : float
The frequency grid point of the calculation.
eta : float
The artificial broadening parameter of the calculation.
Returns
-------
np.ndarray
The value of G.
"""
result, path = self._pre_solve(k, w, eta)
if result is not None:
return result
# Solution ------------------------------------------------------------
total_phonons = np.sum(self._system.model.phonon_number)
for n_phonons in range(total_phonons, 0, -1):
# Special case of the recursion where R_N = alpha_N.
if n_phonons == total_phonons:
R = self._get_alpha(k, w, n_phonons, eta)
continue
# Get the next loop's alpha and beta values
beta = self._get_beta(k, w, n_phonons, eta)
alpha = self._get_alpha(k, w, n_phonons, eta)
# Compute the next R
X = np.eye(beta.shape[0], R.shape[1]) - beta @ R
R = linalg.solve(X, alpha)
a = self._system.model.lattice_constant
t = self._system.model.hopping
G0 = G0_k_omega(k, w, a, eta, t)
beta0 = self._get_beta(k, w, 0, eta)
G = (G0 / (1.0 - beta0 @ R)).squeeze()
G = np.array(G, dtype=np.complex64)
# Solution Done -------------------------------------------------------
self._post_solve(G, k, w, path)
return G