"""Hamiltonian assembly, diagonalization, and dynamics for QMB models.
This module provides the :class:`QMB_hamiltonian` container used to collect
Hamiltonian contributions, build dense/sparse/linear representations, compute
spectral properties, and evolve states in time.
"""
import logging
import numpy as np
from numba import njit, prange
from scipy.linalg import eigh as array_eigh
from scipy.sparse import coo_matrix, csc_matrix, isspmatrix
from scipy.sparse.linalg import LinearOperator
from scipy.sparse.linalg import eigsh as sparse_eigh
from scipy.sparse.linalg import expm, expm_multiply
from edlgt.dtype_config import coerce_numeric_array, get_numeric_dtype
from edlgt.tools import validate_parameters
from .qmb_state import QMB_state, get_norm
logger = logging.getLogger(__name__)
__all__ = ["QMB_hamiltonian", "get_entropy_partition"]
[docs]
class QMB_hamiltonian:
"""Container for a quantum many-body Hamiltonian and derived quantities."""
def __init__(self, lvals, size):
"""Initialize an empty Hamiltonian accumulator.
Parameters
----------
lvals : list or tuple
Lattice dimensions.
size : int
Hilbert-space dimension of the Hamiltonian.
"""
validate_parameters(lvals=lvals)
self.lvals = lvals
self.shape = (size, size)
# Triplet storage is kept in compact dtypes because these arrays can be
# very large for symmetry-reduced sectors.
self._index_dtype = np.int32
self._value_dtype = get_numeric_dtype()
self.row_list = np.empty(0, dtype=self._index_dtype)
self.col_list = np.empty(0, dtype=self._index_dtype)
self.value_list = np.empty(0, dtype=self._value_dtype)
# New terms are accumulated block-wise and flattened only when needed.
# This avoids repeated concatenate/copy work in add_term.
self._row_blocks = []
self._col_blocks = []
self._value_blocks = []
self._pending_nnz = 0
self.Ham = None
self.indptr = None
self.indices = None
self.data = None
self.Ham_type = None
self.loc_dims = None
self.n_eigs = None
self.Nenergies = None
self.Npsi = None
self.GSenergy = None
self.GSpsi = None
self.psi_time = None
self.r_value = None
def _normalize_term_triplets(self, term):
"""Convert a Hamiltonian contribution to normalized COO triplet arrays.
Parameters
----------
term : tuple or numpy.ndarray or scipy.sparse.spmatrix
Hamiltonian contribution provided as a sparse triplet
``(row_list, col_list, value_list)``, a dense matrix, or a SciPy
sparse matrix.
Returns
-------
tuple
``(row_list, col_list, value_list)`` as one-dimensional arrays with
dtypes compatible with the internal accumulator.
Raises
------
TypeError
If ``term`` has an unsupported format.
ValueError
If the three triplet arrays do not have the same length.
"""
if isinstance(term, tuple) and len(term) == 3:
# Case 1: term is already given in COO triplet format.
row_list, col_list, value_list = term
elif isinstance(term, np.ndarray):
# Case 2: term is a dense matrix.
row_list, col_list = np.nonzero(term)
value_list = term[row_list, col_list]
elif isspmatrix(term):
# Case 3: term is a SciPy sparse matrix.
coo = coo_matrix(term)
row_list = coo.row
col_list = coo.col
value_list = coo.data
else:
msg = (
"Unsupported term type: expect (dense/sparse) or "
"(row_list, col_list, value_list)"
)
raise TypeError(msg)
row_list = np.asarray(row_list, dtype=self._index_dtype)
col_list = np.asarray(col_list, dtype=self._index_dtype)
value_list = coerce_numeric_array(value_list, name="Hamiltonian term values")
value_list = value_list.astype(self._value_dtype, copy=False)
return row_list, col_list, value_list
def _append_triplet_block(
self, row_list: np.ndarray, col_list: np.ndarray, value_list: np.ndarray
):
"""Register one triplet block without flattening previous blocks.
Parameters
----------
row_list, col_list, value_list : numpy.ndarray
One-dimensional triplet arrays for a single Hamiltonian contribution.
"""
if row_list.shape[0] == 0:
return
self._row_blocks.append(row_list)
self._col_blocks.append(col_list)
self._value_blocks.append(value_list)
self._pending_nnz += row_list.shape[0]
def _materialize_triplets(self):
"""Flatten pending triplet blocks into contiguous arrays once.
This method is called before building sparse/dense/linear Hamiltonian
representations or when direct access to the triplets is required.
"""
if self._pending_nnz == 0:
return
# Existing materialized triplets are kept and extended with the pending blocks.
total_nnz = self.row_list.shape[0] + self._pending_nnz
row_list = np.empty(total_nnz, dtype=self._index_dtype)
col_list = np.empty(total_nnz, dtype=self._index_dtype)
value_list = np.empty(total_nnz, dtype=self._value_dtype)
write_pos = 0
# Save the current set of nnzentries
if self.row_list.shape[0] > 0:
n_existing = self.row_list.shape[0]
row_list[:n_existing] = self.row_list
col_list[:n_existing] = self.col_list
value_list[:n_existing] = self.value_list
write_pos = n_existing
# Add the new set of nnzentries
for block_row, block_col, block_value in zip(
self._row_blocks, self._col_blocks, self._value_blocks
):
block_size = block_row.shape[0]
next_pos = write_pos + block_size
row_list[write_pos:next_pos] = block_row
col_list[write_pos:next_pos] = block_col
value_list[write_pos:next_pos] = block_value
write_pos = next_pos
# get the global list of nnz entries
self.row_list = row_list
self.col_list = col_list
self.value_list = value_list
# release memory
self._row_blocks.clear()
self._col_blocks.clear()
self._value_blocks.clear()
self._pending_nnz = 0
[docs]
def add_term(self, term):
"""Add a Hamiltonian contribution to the internal sparse triplet lists.
Parameters
----------
term : tuple or numpy.ndarray or scipy.sparse.spmatrix
Hamiltonian contribution provided as a sparse triplet
``(row_list, col_list, value_list)``, a dense matrix, or a SciPy
sparse matrix.
Raises
------
TypeError
If ``term`` has an unsupported format.
"""
row_list, col_list, value_list = self._normalize_term_triplets(term)
self._append_triplet_block(row_list, col_list, value_list)
[docs]
def build(self, format): # pylint: disable=redefined-builtin
"""Build the Hamiltonian representation from accumulated triplets.
Parameters
----------
format : str
Target representation: ``"dense"``, ``"sparse"``, or ``"linear"``.
"""
logger.info("Construct %s Hamiltonian", format)
# Flatten all pending triplet blocks exactly once before building.
self._materialize_triplets()
# ==========================================================================
def matvec(psi):
"""Function to compute H @ v."""
result = csr_spmv(self.indptr, self.indices, self.data, psi)
if result is None:
raise ValueError("matvec returned None.")
if not isinstance(result, np.ndarray):
raise TypeError(f"matvec must return np.ndarray, got {type(result)}.")
return result
# ==========================================================================
def matmat(psis):
"""Function to compute H @ matrix (batch of vectors)."""
result = csr_matmat(self.indptr, self.indices, self.data, psis)
if result is None:
raise ValueError("matmat returned None.")
if not isinstance(result, np.ndarray):
raise TypeError(f"matmat must return np.ndarray, got {type(result)}.")
return result
# ==========================================================================
if format == "sparse":
self.Ham = csc_matrix(
(self.value_list, (self.row_list, self.col_list)), shape=self.shape
)
# ==========================================================================
elif format == "linear":
matrix_dim = self.shape[0]
self.indptr, self.indices, self.data = build_csr_numba(
matrix_dim, self.row_list, self.col_list, self.value_list
)
self.Ham = LinearOperator(shape=self.shape, matvec=matvec, matmat=matmat)
# ==========================================================================
elif format == "dense":
self.Ham = csc_matrix(
(self.value_list, (self.row_list, self.col_list)), shape=self.shape
)
self.Ham = self.Ham.toarray()
# Measure sparsity
self.get_sparsity()
self.Ham_type = format
[docs]
def convert_hamiltonian(self, format): # pylint: disable=redefined-builtin
"""Convert the Hamiltonian to another representation type.
Parameters
----------
format : str
Target representation: ``"dense"``, ``"sparse"``, or ``"linear"``.
Raises
------
ValueError
If ``format`` is not supported.
"""
logger.info("CONVERT HAMILTONIAN from %s to %s", self.Ham_type, format)
if format == self.Ham_type:
return # Already in the desired format
if format == "dense":
if self.Ham_type == "sparse":
self.Ham = self.Ham.toarray()
elif self.Ham_type == "linear":
self.build("dense")
elif format == "sparse":
if self.Ham_type == "dense":
self.Ham = csc_matrix(self.Ham)
elif self.Ham_type == "linear":
self.build("sparse")
elif format == "linear":
self.build("linear")
else:
msg = f"Invalid format: {format}. Use 'dense', 'sparse', or 'linear'."
raise ValueError(msg)
self.Ham_type = format
[docs]
def diagonalize(self, n_eigs, format, loc_dims, print_results=True): # pylint: disable=redefined-builtin
"""Diagonalize the Hamiltonian and store eigenvalues/eigenstates.
Parameters
----------
n_eigs : int or str
Number of eigenpairs to compute, or ``"full"`` for full
diagonalization.
format : str
Hamiltonian representation to use for diagonalization.
loc_dims : list or numpy.ndarray
Local Hilbert-space dimensions used to build :class:`QMB_state`
objects for eigenvectors.
print_results : bool, optional
If ``True``, log energies and energy densities.
Returns
-------
None
Raises
------
ValueError
If ``n_eigs`` is invalid.
"""
# Ensure Hamiltonian is Hermitian and check sparsity
# check_hermitian(self.Ham)
validate_parameters(loc_dims=loc_dims)
if format != self.Ham_type:
self.convert_hamiltonian(format)
# Save local dimensions
self.loc_dims = loc_dims
# Determine number of eigenvalues to compute
if isinstance(n_eigs, int):
if n_eigs > self.shape[0]:
msg = f"n_eigs must be < H.shape[0]={self.shape[0]}, got {n_eigs}"
raise ValueError(msg)
self.n_eigs = n_eigs
elif isinstance(n_eigs, str) and n_eigs == "full":
self.n_eigs = self.shape[0]
else:
raise ValueError(f"n_eigs must be int or 'full', not {n_eigs}.")
# Select diagonalization format
logger.info("----------------------------------------------------")
if self.Ham_type == "dense" or self.n_eigs == "full":
logger.info("DIAGONALIZE (dense) HAMILTONIAN")
energies, eigenvectors = array_eigh(self.Ham)
elif (
self.Ham_type == "sparse"
or self.Ham_type == "linear"
and isinstance(n_eigs, int)
):
logger.info("DIAGONALIZE %s HAMILTONIAN", self.Ham_type)
energies, eigenvectors = sparse_eigh(self.Ham, k=self.n_eigs, which="SA")
else:
msg = (
"Unsupported diagonalization combination: "
f"Ham_type={self.Ham_type}, n_eigs={n_eigs}"
)
raise ValueError(msg)
# Sort and save eigenvalues/eigenstates
if not is_sorted(energies):
order = np.argsort(energies)
energies, eigenvectors = energies[order], eigenvectors[:, order]
if print_results:
# Save the eigenstates as QMB_states
logger.info("TOT ENERGY -- EN DENSITY")
for _, total_energy in enumerate(energies):
en_density = total_energy / np.prod(self.lvals)
logger.info("%.10f %.10f", total_energy, en_density)
self.Nenergies = energies / np.prod(self.lvals)
self.Npsi = [
QMB_state(eigenvectors[:, ii], self.lvals, self.loc_dims)
for ii in range(self.n_eigs)
]
# Save GROUND STATE PROPERTIES
self.GSenergy = self.Nenergies[0]
self.GSpsi = self.Npsi[0]
[docs]
def time_evolution(
self,
initial_state: np.ndarray,
time_line: np.ndarray,
loc_dims: np.ndarray,
):
"""Evolve an initial state along a time grid using the current Hamiltonian.
Parameters
----------
initial_state : numpy.ndarray
Initial state vector.
time_line : numpy.ndarray
Time samples at which the evolved state is returned.
loc_dims : numpy.ndarray
Local Hilbert-space dimensions used to wrap evolved states into
:class:`QMB_state` objects.
Returns
-------
None
Evolved states are stored in ``self.psi_time``.
"""
# Save local dimensions
self.loc_dims = loc_dims
# Compute the time spacing and the time line
delta_t = time_line[1] - time_line[0]
msg = f"------------ TIME EVOLUTION: DELTA {round(delta_t,4)} ------------"
logger.info(msg)
# Check Hamiltonian type and compute the time evolution
if self.Ham_type == "dense":
# Check if Hamiltonian is already diagonalized
if self.Nenergies is None or self.Npsi is None:
self.diagonalize(n_eigs="full", format="dense", loc_dims=loc_dims)
# Multiply energy densities by the number of sites to get total energies
evals = self.Nenergies * np.prod(self.lvals)
# Run the exact time evolution
eigenstate_matrix = np.array([eigenstate.psi for eigenstate in self.Npsi]).T
psi_time = exact_time_evolution(
time_line,
evals,
eigenstate_matrix,
initial_state.astype(np.complex128),
)
elif self.Ham_type == "sparse":
generator_matrix = complex(0, -1) * self.Ham
psi_time = expm_multiply(
A=generator_matrix,
B=initial_state,
start=time_line[0],
stop=time_line[-1],
num=len(time_line),
endpoint=True,
)
elif self.Ham_type == "linear":
# Ensure traceA sees the complete set of accumulated triplets.
self._materialize_triplets()
def matvec_lin(psi):
return -1j * self.Ham.matvec(psi)
def matmat_lin(psis):
return -1j * self.Ham.matmat(psis)
def rmatvec_lin(psi):
return +1j * self.Ham.matvec(psi)
def rmatmat_lin(psis):
return +1j * self.Ham.matmat(psis)
A_lin = LinearOperator(
shape=self.shape,
matvec=matvec_lin,
matmat=matmat_lin,
rmatvec=rmatvec_lin,
rmatmat=rmatmat_lin,
dtype=np.complex128,
)
trace_a = compute_trace_ih(self.row_list, self.col_list, self.value_list)
psi_time = expm_multiply(
A=A_lin,
B=initial_state,
start=time_line[0],
stop=time_line[-1],
num=len(time_line),
endpoint=True,
traceA=trace_a,
)
logger.info("Saving time evolved states")
# Save them as QMB_states
self.psi_time = [
QMB_state(psi_time[ii, :], self.lvals, self.loc_dims)
for ii in range(len(time_line))
]
[docs]
def partition_function(self, beta):
"""Compute the partition function for inverse temperature ``beta``.
Parameters
----------
beta : float
Inverse temperature.
Returns
-------
float
Partition function ``Z``.
Raises
------
ValueError
If the computed partition function is non-positive.
"""
if self.n_eigs == self.Ham.shape[0]:
partition_fn = np.sum(np.exp(-beta * self.Nenergies))
else:
partition_fn = np.real(csc_matrix(expm(-beta * self.Ham)).trace())
if partition_fn <= 0:
raise ValueError(f"Z must be positive, not {partition_fn}.")
return partition_fn
[docs]
def free_energy(self, beta):
"""Compute the free energy at inverse temperature ``beta``.
Parameters
----------
beta : float
Inverse temperature.
Returns
-------
float
Free energy.
"""
partition_fn = self.partition_function(beta)
free_energy = -1 / beta * np.log(partition_fn)
return free_energy
[docs]
def thermal_average(self, beta):
"""Compute the thermal average energy at inverse temperature ``beta``.
Parameters
----------
beta : float
Inverse temperature.
Returns
-------
float
Thermal average energy.
"""
partition_fn = self.partition_function(beta)
if self.n_eigs == self.Ham.shape[0]:
return self.Nenergies.dot(np.exp(-beta * self.Nenergies)) / partition_fn
return np.real(self.Ham.dot(expm(-beta * self.Ham)).trace()) / partition_fn
[docs]
def f_prime(self, beta):
"""Compute the derivative used in the Newton solve for ``beta``.
Parameters
----------
beta : float
Inverse temperature.
Returns
-------
float
Derivative-like quantity used by :meth:`get_beta`.
"""
partition_fn = self.partition_function(beta)
if self.n_eigs == self.Ham.shape[0]:
energy_sq_average = (
np.square(self.Nenergies).dot(np.exp(-beta * self.Nenergies))
/ partition_fn
)
else:
energy_sq_average = np.real(
(self.Ham**2).dot(expm(-beta * self.Ham)).trace()
) / partition_fn
return -energy_sq_average - self.thermal_average(beta) ** 2
F_prime = f_prime # pylint: disable=invalid-name
[docs]
def get_beta(self, state, threshold=1e-10, max_iter=1000):
"""Estimate an effective inverse temperature for a reference state.
Parameters
----------
state : numpy.ndarray
Reference state vector.
threshold : float, optional
Convergence tolerance for the Newton iteration.
max_iter : int, optional
Maximum number of Newton iterations.
Returns
-------
float
Estimated inverse temperature ``beta``.
Notes
-----
The method solves for the root of the difference between the thermal
average energy and the energy of ``state``.
"""
iter_count = 0
accuracy = 1
beta = 1e-7
logger.info("=========== GET BETA ===============")
# Get the reference energy value for the chosen state
state_energy = QMB_state(state).expectation_value(self.Ham)
while accuracy > threshold and iter_count < max_iter:
iter_count += 1
if self.n_eigs == self.Ham.shape[0]:
partition_fn = self.partition_function(beta)
energy_avg = (
self.Nenergies.dot(np.exp(-beta * self.Nenergies)) / partition_fn
)
energy_delta = energy_avg - state_energy
energy_sq_avg = (
np.square(self.Nenergies).dot(np.exp(-beta * self.Nenergies))
/ partition_fn
)
derivative_value = -energy_sq_avg - (energy_avg**2)
else:
thermal_kernel = csc_matrix(expm(-beta * self.Ham))
partition_fn = np.real(thermal_kernel.trace())
energy_avg = np.real(self.Ham.dot(thermal_kernel).trace()) / partition_fn
energy_delta = energy_avg - state_energy
derivative_value = (
-np.real((self.Ham**2).dot(thermal_kernel).trace()) / partition_fn
- energy_avg**2
)
# ==================================================
prev_val = beta
beta = beta - energy_delta / derivative_value
accuracy = abs(beta - prev_val)
logger.info("------------------------------------")
logger.info("F=%s", energy_delta)
logger.info("Fp=%s", derivative_value)
logger.info("beta=%s", beta)
logger.info("accuracy=%s", accuracy)
if iter_count >= max_iter:
logger.warning("Maximum iterations reached without convergence.")
logger.info("BETA %s", beta)
return beta
[docs]
def get_r_value(self):
"""Compute adjacent-gap ratio statistics from the stored spectrum.
Returns
-------
numpy.ndarray
Array of local adjacent-gap ratios.
Raises
------
ValueError
If the Hamiltonian has not been fully diagonalized.
"""
# Check Hamiltonian type and compute the time evolution
if self.Ham_type == "dense":
# Check if Hamiltonian is already diagonalized
if self.Nenergies is None or self.Npsi is None:
raise ValueError("The Hamiltonian has to be fully diagonalized.")
else:
raise ValueError("The Hamiltonian has to be fully diagonalized.")
# Compute level spacings
delta_E = np.diff(np.sort(self.Nenergies))
# Compute r-values
r_array = np.array(
[
min(delta_E[ii], delta_E[ii + 1]) / max(delta_E[ii], delta_E[ii + 1])
for ii in range(len(delta_E) - 1)
]
)
# Define the bulk range
bulk_start = int(0.1 * len(r_array))
bulk_end = int(0.9 * len(r_array))
# Compute the average r-value for the bulk
self.r_value = np.mean(r_array[bulk_start:bulk_end])
logger.info("R value %s", self.r_value)
return r_array
[docs]
def print_energy(self, en_state):
"""Log the energy density of a selected eigenstate index.
Parameters
----------
en_state : int
Eigenstate index in ``self.Nenergies``.
"""
logger.info("====================================================")
logger.info("%s ENERGY: %.16f", en_state, self.Nenergies[en_state])
[docs]
def get_sparsity(self):
"""Log the current sparsity estimated from accumulated triplets."""
self._materialize_triplets()
sparsity = len(self.row_list) / (self.shape[0] ** 2)
logger.info("SPARSITY: %.16f", sparsity)
def is_sorted(array1D):
"""Return ``True`` if a 1D array is non-decreasing."""
return np.all(array1D[:-1] <= array1D[1:])
def get_sorted_indices(data):
"""Return indices that sort entries by descending magnitude."""
abs_data = np.abs(data)
real_data = np.real(data)
# Lexsort by real part first (secondary key), then by absolute value (primary key)
sorted_indices = np.lexsort((real_data, abs_data))
return sorted_indices[::-1] # Descending order
[docs]
def get_entropy_partition(lvals, option="half"):
"""Return lattice-site indices used for a standard bipartition.
Parameters
----------
lvals : list or tuple
Lattice dimensions.
option : str, optional
Partition preset. Currently only ``"half"`` is implemented.
Returns
-------
list[int]
Site indices belonging to the selected subsystem.
Raises
------
NotImplementedError
If ``option`` is not implemented.
"""
if option == "half":
if len(lvals) == 1:
partition_indices = list(np.arange(0, int(lvals[0] / 2), 1))
else:
partition_indices = list(np.arange(0, int(lvals[0] / 2), 1)) + list(
np.arange(lvals[0], lvals[0] + int(lvals[0] / 2), 1)
)
else:
raise NotImplementedError("for the moment it works only for option=half")
return partition_indices
@njit(cache=True)
def build_csr_numba(matrix_dim, row_list, col_list, data_list):
"""Build CSR arrays from COO-style triplets.
Parameters
----------
matrix_dim : int
Matrix dimension (square matrix).
row_list, col_list, data_list : numpy.ndarray
COO-style nonzero entries.
Returns
-------
tuple
``(indptr, indices, data)`` CSR arrays.
"""
nnz = row_list.shape[0]
indptr = np.zeros(matrix_dim + 1, np.int32)
# 1) count nonzeros per row
for idx in range(nnz):
indptr[row_list[idx] + 1] += 1
# 2) prefix-sum
for ii in range(1, matrix_dim + 1):
indptr[ii] += indptr[ii - 1]
# 3) scatter into CSR
indices = np.empty(nnz, np.int32)
data = np.empty(nnz, data_list.dtype)
next_position = indptr[:-1].copy()
for idx in range(nnz):
row_idx = row_list[idx]
write_idx = next_position[row_idx]
indices[write_idx] = col_list[idx]
data[write_idx] = data_list[idx]
next_position[row_idx] += 1
return indptr, indices, data
@njit(cache=True, parallel=True)
def csr_spmv(
indptr: np.ndarray,
indices: np.ndarray,
data: np.ndarray,
input_vector: np.ndarray,
) -> np.ndarray:
"""Multiply a CSR matrix by a vector.
Parameters
----------
indptr, indices, data : numpy.ndarray
CSR arrays.
input_vector : numpy.ndarray
Input vector.
Returns
-------
numpy.ndarray
Product ``H @ x``.
"""
matrix_dim = indptr.shape[0] - 1
h_psi = np.zeros(matrix_dim, dtype=np.complex128)
for ii in prange(matrix_dim):
value = 0.0 + 0j
for pp in range(indptr[ii], indptr[ii + 1]):
value += data[pp] * input_vector[indices[pp]]
h_psi[ii] = value
return h_psi
@njit(cache=True, parallel=True)
def csr_matmat(
indptr: np.ndarray,
indices: np.ndarray,
data: np.ndarray,
input_matrix: np.ndarray,
) -> np.ndarray:
"""Multiply a CSR matrix by a dense matrix (multiple vectors).
Parameters
----------
indptr, indices, data : numpy.ndarray
CSR arrays.
input_matrix : numpy.ndarray
Dense matrix whose columns are vectors.
Returns
-------
numpy.ndarray
Product ``H @ input_matrix``.
"""
matrix_dim = indptr.shape[0] - 1
n_vectors = input_matrix.shape[1]
output_matrix = np.zeros((matrix_dim, n_vectors), dtype=np.complex128)
# outer loop parallelized over the matrix rows
for ii in prange(matrix_dim):
# for each nonzero in row i
for pp in range(indptr[ii], indptr[ii + 1]):
col_idx = indices[pp] # column index
value = data[pp] # value H[i,j]
# accumulate v * input_matrix[j, :] into output_matrix[i, :]
for kk in range(n_vectors):
output_matrix[ii, kk] += value * input_matrix[col_idx, kk]
return output_matrix
@njit(cache=True)
def compute_trace_ih(
row_list: np.ndarray, col_list: np.ndarray, data_list: np.ndarray
) -> complex:
"""Compute ``trace(-i H)`` directly from triplet data.
Parameters
----------
row_list, col_list, data_list : numpy.ndarray
COO-style nonzero entries of ``H``.
Returns
-------
complex
``trace(-i H)``.
"""
trace_value = 0.0 + 0.0j
nnz = row_list.shape[0]
for idx in range(nnz):
if row_list[idx] == col_list[idx]:
trace_value += -1j * data_list[idx]
return trace_value
compute_trace_iH = compute_trace_ih # pylint: disable=invalid-name
@njit(parallel=True, cache=True)
def exact_time_evolution(
time_line: np.ndarray,
eigenvalues: np.ndarray,
eigenvectors: np.ndarray,
initial_state: np.ndarray,
):
"""Time evolve a state from a full eigendecomposition.
Parameters
----------
time_line : numpy.ndarray
Time samples.
eigenvalues : numpy.ndarray
Hamiltonian eigenvalues.
eigenvectors : numpy.ndarray
Hamiltonian eigenvectors stored by columns.
initial_state : numpy.ndarray
Initial state vector.
Returns
-------
numpy.ndarray
Time-evolved states with shape ``(n_steps, psi_dim)``.
"""
psi_dim = len(initial_state)
n_eigs = len(eigenvalues)
# To store time-evolved states
n_steps = len(time_line)
psi_time = np.empty((n_steps, psi_dim), np.complex128)
# Precompute overlaps <E_i|psi(0)> for all i
overlaps = compute_overlap_with_eigenstates(eigenvectors, initial_state)
# Loop over time steps in parallel
for t_idx in prange(n_steps):
time_value = time_line[t_idx]
evolved_state = np.zeros(psi_dim, dtype=np.complex128)
# Loop over eigenstates
for ii in range(n_eigs):
# Compute phase factor
phase = np.exp(-1j * eigenvalues[ii] * time_value)
# Accumulate contributions
evolved_state += phase * overlaps[ii] * eigenvectors[:, ii]
psi_time[t_idx, :] = evolved_state
return psi_time
@njit(cache=True, parallel=True)
def compute_overlap_with_eigenstates(eigenvectors: np.ndarray, state: np.ndarray):
"""Compute overlaps between a state and an eigenbasis.
Parameters
----------
eigenvectors : numpy.ndarray
Eigenvectors stored by columns.
state : numpy.ndarray
State vector.
Returns
-------
numpy.ndarray
Overlaps ``<E_i|state>``.
"""
psi_dim = eigenvectors.shape[0]
n_eigs = eigenvectors.shape[1]
overlaps = np.empty(n_eigs, np.complex128)
for ii in prange(n_eigs):
acc = 0.0 + 0.0j
for jj in range(psi_dim):
acc += np.conj(eigenvectors[jj, ii]) * state[jj]
overlaps[ii] = acc
return overlaps
@njit(parallel=True)
def csr_spmv_lin(
indptr: np.ndarray,
indices: np.ndarray,
data: np.ndarray,
input_vector: np.ndarray,
output_buffer: np.ndarray,
):
"""Compute ``out = -1j * H @ x`` for a CSR matrix ``H``.
Parameters
----------
indptr, indices, data : numpy.ndarray
CSR arrays defining ``H``.
input_vector : numpy.ndarray
Input vector.
output_buffer : numpy.ndarray
Output buffer written in place.
"""
matrix_dim = indptr.shape[0] - 1
for ii in prange(matrix_dim):
value = 0.0 + 0.0j
for pp in range(indptr[ii], indptr[ii + 1]):
value += data[pp] * input_vector[indices[pp]]
output_buffer[ii] = -1j * value
@njit(parallel=True, cache=True)
def runge_kutta4_time_evolution(
initial_state: np.ndarray,
indptr: np.ndarray,
indices: np.ndarray,
data: np.ndarray,
time_line: np.ndarray,
) -> np.ndarray:
"""Integrate ``dpsi/dt = -i H psi`` with a parallel RK4 scheme.
Parameters
----------
initial_state : numpy.ndarray
Initial state vector.
indptr, indices, data : numpy.ndarray
CSR arrays defining ``H``.
time_line : numpy.ndarray
Time grid.
Returns
-------
numpy.ndarray
Array of states with shape ``(len(time_line), state_dim)``.
"""
state_dim = initial_state.shape[0]
steps = len(time_line)
dt = time_line[1] - time_line[0]
# allocate output and temporaries
out = np.empty((steps, state_dim), np.complex128)
# Ensure psi is complex128.
psi = np.empty(state_dim, np.complex128)
for ii in prange(state_dim):
psi[ii] = initial_state[ii] # copies and *converts* into a complex slot
out[0, ii] = initial_state[ii]
k1 = np.empty(state_dim, np.complex128)
k2 = np.empty(state_dim, np.complex128)
k3 = np.empty(state_dim, np.complex128)
k4 = np.empty(state_dim, np.complex128)
tmp = np.empty(state_dim, np.complex128)
for step_idx in range(1, steps):
# k1 = -i H @ psi
csr_spmv_lin(indptr, indices, data, psi, k1)
# k2 = -i H @ (psi + dt/2*k1)
for ii in prange(state_dim):
tmp[ii] = psi[ii] + 0.5 * dt * k1[ii]
csr_spmv_lin(indptr, indices, data, tmp, k2)
# k3 = -i H @ (psi + dt/2*k2)
for ii in prange(state_dim):
tmp[ii] = psi[ii] + 0.5 * dt * k2[ii]
csr_spmv_lin(indptr, indices, data, tmp, k3)
# k4 = -i H @ (psi + dt*k3)
for ii in prange(state_dim):
tmp[ii] = psi[ii] + dt * k3[ii]
csr_spmv_lin(indptr, indices, data, tmp, k4)
# combine to advance psi
for ii in prange(state_dim):
psi[ii] += dt * (k1[ii] + 2 * k2[ii] + 2 * k3[ii] + k4[ii]) / 6.0
# renormalize to unit norm
nrm = get_norm(psi)
for ii in prange(state_dim):
psi[ii] /= nrm
out[step_idx] = psi
return out