Source code for edlgt.modeling.qmb_hamiltonian

"""Hamiltonian assembly, diagonalization, and dynamics for QMB models.

This module provides the :class:`QMB_hamiltonian` container used to collect
Hamiltonian contributions, build dense/sparse/linear representations, compute
spectral properties, and evolve states in time.
"""

import logging

import numpy as np
from numba import njit, prange
from scipy.linalg import eigh as array_eigh
from scipy.sparse import coo_matrix, csc_matrix, isspmatrix
from scipy.sparse.linalg import LinearOperator
from scipy.sparse.linalg import eigsh as sparse_eigh
from scipy.sparse.linalg import expm, expm_multiply

from edlgt.dtype_config import coerce_numeric_array, get_numeric_dtype
from edlgt.tools import validate_parameters

from .qmb_state import QMB_state, get_norm

logger = logging.getLogger(__name__)

__all__ = ["QMB_hamiltonian", "get_entropy_partition"]


[docs] class QMB_hamiltonian: """Container for a quantum many-body Hamiltonian and derived quantities.""" def __init__(self, lvals, size): """Initialize an empty Hamiltonian accumulator. Parameters ---------- lvals : list or tuple Lattice dimensions. size : int Hilbert-space dimension of the Hamiltonian. """ validate_parameters(lvals=lvals) self.lvals = lvals self.shape = (size, size) # Triplet storage is kept in compact dtypes because these arrays can be # very large for symmetry-reduced sectors. self._index_dtype = np.int32 self._value_dtype = get_numeric_dtype() self.row_list = np.empty(0, dtype=self._index_dtype) self.col_list = np.empty(0, dtype=self._index_dtype) self.value_list = np.empty(0, dtype=self._value_dtype) # New terms are accumulated block-wise and flattened only when needed. # This avoids repeated concatenate/copy work in add_term. self._row_blocks = [] self._col_blocks = [] self._value_blocks = [] self._pending_nnz = 0 self.Ham = None self.indptr = None self.indices = None self.data = None self.Ham_type = None self.loc_dims = None self.n_eigs = None self.Nenergies = None self.Npsi = None self.GSenergy = None self.GSpsi = None self.psi_time = None self.r_value = None def _normalize_term_triplets(self, term): """Convert a Hamiltonian contribution to normalized COO triplet arrays. Parameters ---------- term : tuple or numpy.ndarray or scipy.sparse.spmatrix Hamiltonian contribution provided as a sparse triplet ``(row_list, col_list, value_list)``, a dense matrix, or a SciPy sparse matrix. Returns ------- tuple ``(row_list, col_list, value_list)`` as one-dimensional arrays with dtypes compatible with the internal accumulator. Raises ------ TypeError If ``term`` has an unsupported format. ValueError If the three triplet arrays do not have the same length. """ if isinstance(term, tuple) and len(term) == 3: # Case 1: term is already given in COO triplet format. row_list, col_list, value_list = term elif isinstance(term, np.ndarray): # Case 2: term is a dense matrix. row_list, col_list = np.nonzero(term) value_list = term[row_list, col_list] elif isspmatrix(term): # Case 3: term is a SciPy sparse matrix. coo = coo_matrix(term) row_list = coo.row col_list = coo.col value_list = coo.data else: msg = ( "Unsupported term type: expect (dense/sparse) or " "(row_list, col_list, value_list)" ) raise TypeError(msg) row_list = np.asarray(row_list, dtype=self._index_dtype) col_list = np.asarray(col_list, dtype=self._index_dtype) value_list = coerce_numeric_array(value_list, name="Hamiltonian term values") value_list = value_list.astype(self._value_dtype, copy=False) return row_list, col_list, value_list def _append_triplet_block( self, row_list: np.ndarray, col_list: np.ndarray, value_list: np.ndarray ): """Register one triplet block without flattening previous blocks. Parameters ---------- row_list, col_list, value_list : numpy.ndarray One-dimensional triplet arrays for a single Hamiltonian contribution. """ if row_list.shape[0] == 0: return self._row_blocks.append(row_list) self._col_blocks.append(col_list) self._value_blocks.append(value_list) self._pending_nnz += row_list.shape[0] def _materialize_triplets(self): """Flatten pending triplet blocks into contiguous arrays once. This method is called before building sparse/dense/linear Hamiltonian representations or when direct access to the triplets is required. """ if self._pending_nnz == 0: return # Existing materialized triplets are kept and extended with the pending blocks. total_nnz = self.row_list.shape[0] + self._pending_nnz row_list = np.empty(total_nnz, dtype=self._index_dtype) col_list = np.empty(total_nnz, dtype=self._index_dtype) value_list = np.empty(total_nnz, dtype=self._value_dtype) write_pos = 0 # Save the current set of nnzentries if self.row_list.shape[0] > 0: n_existing = self.row_list.shape[0] row_list[:n_existing] = self.row_list col_list[:n_existing] = self.col_list value_list[:n_existing] = self.value_list write_pos = n_existing # Add the new set of nnzentries for block_row, block_col, block_value in zip( self._row_blocks, self._col_blocks, self._value_blocks ): block_size = block_row.shape[0] next_pos = write_pos + block_size row_list[write_pos:next_pos] = block_row col_list[write_pos:next_pos] = block_col value_list[write_pos:next_pos] = block_value write_pos = next_pos # get the global list of nnz entries self.row_list = row_list self.col_list = col_list self.value_list = value_list # release memory self._row_blocks.clear() self._col_blocks.clear() self._value_blocks.clear() self._pending_nnz = 0
[docs] def add_term(self, term): """Add a Hamiltonian contribution to the internal sparse triplet lists. Parameters ---------- term : tuple or numpy.ndarray or scipy.sparse.spmatrix Hamiltonian contribution provided as a sparse triplet ``(row_list, col_list, value_list)``, a dense matrix, or a SciPy sparse matrix. Raises ------ TypeError If ``term`` has an unsupported format. """ row_list, col_list, value_list = self._normalize_term_triplets(term) self._append_triplet_block(row_list, col_list, value_list)
[docs] def build(self, format): # pylint: disable=redefined-builtin """Build the Hamiltonian representation from accumulated triplets. Parameters ---------- format : str Target representation: ``"dense"``, ``"sparse"``, or ``"linear"``. """ logger.info("Construct %s Hamiltonian", format) # Flatten all pending triplet blocks exactly once before building. self._materialize_triplets() # ========================================================================== def matvec(psi): """Function to compute H @ v.""" result = csr_spmv(self.indptr, self.indices, self.data, psi) if result is None: raise ValueError("matvec returned None.") if not isinstance(result, np.ndarray): raise TypeError(f"matvec must return np.ndarray, got {type(result)}.") return result # ========================================================================== def matmat(psis): """Function to compute H @ matrix (batch of vectors).""" result = csr_matmat(self.indptr, self.indices, self.data, psis) if result is None: raise ValueError("matmat returned None.") if not isinstance(result, np.ndarray): raise TypeError(f"matmat must return np.ndarray, got {type(result)}.") return result # ========================================================================== if format == "sparse": self.Ham = csc_matrix( (self.value_list, (self.row_list, self.col_list)), shape=self.shape ) # ========================================================================== elif format == "linear": matrix_dim = self.shape[0] self.indptr, self.indices, self.data = build_csr_numba( matrix_dim, self.row_list, self.col_list, self.value_list ) self.Ham = LinearOperator(shape=self.shape, matvec=matvec, matmat=matmat) # ========================================================================== elif format == "dense": self.Ham = csc_matrix( (self.value_list, (self.row_list, self.col_list)), shape=self.shape ) self.Ham = self.Ham.toarray() # Measure sparsity self.get_sparsity() self.Ham_type = format
[docs] def convert_hamiltonian(self, format): # pylint: disable=redefined-builtin """Convert the Hamiltonian to another representation type. Parameters ---------- format : str Target representation: ``"dense"``, ``"sparse"``, or ``"linear"``. Raises ------ ValueError If ``format`` is not supported. """ logger.info("CONVERT HAMILTONIAN from %s to %s", self.Ham_type, format) if format == self.Ham_type: return # Already in the desired format if format == "dense": if self.Ham_type == "sparse": self.Ham = self.Ham.toarray() elif self.Ham_type == "linear": self.build("dense") elif format == "sparse": if self.Ham_type == "dense": self.Ham = csc_matrix(self.Ham) elif self.Ham_type == "linear": self.build("sparse") elif format == "linear": self.build("linear") else: msg = f"Invalid format: {format}. Use 'dense', 'sparse', or 'linear'." raise ValueError(msg) self.Ham_type = format
[docs] def diagonalize(self, n_eigs, format, loc_dims, print_results=True): # pylint: disable=redefined-builtin """Diagonalize the Hamiltonian and store eigenvalues/eigenstates. Parameters ---------- n_eigs : int or str Number of eigenpairs to compute, or ``"full"`` for full diagonalization. format : str Hamiltonian representation to use for diagonalization. loc_dims : list or numpy.ndarray Local Hilbert-space dimensions used to build :class:`QMB_state` objects for eigenvectors. print_results : bool, optional If ``True``, log energies and energy densities. Returns ------- None Raises ------ ValueError If ``n_eigs`` is invalid. """ # Ensure Hamiltonian is Hermitian and check sparsity # check_hermitian(self.Ham) validate_parameters(loc_dims=loc_dims) if format != self.Ham_type: self.convert_hamiltonian(format) # Save local dimensions self.loc_dims = loc_dims # Determine number of eigenvalues to compute if isinstance(n_eigs, int): if n_eigs > self.shape[0]: msg = f"n_eigs must be < H.shape[0]={self.shape[0]}, got {n_eigs}" raise ValueError(msg) self.n_eigs = n_eigs elif isinstance(n_eigs, str) and n_eigs == "full": self.n_eigs = self.shape[0] else: raise ValueError(f"n_eigs must be int or 'full', not {n_eigs}.") # Select diagonalization format logger.info("----------------------------------------------------") if self.Ham_type == "dense" or self.n_eigs == "full": logger.info("DIAGONALIZE (dense) HAMILTONIAN") energies, eigenvectors = array_eigh(self.Ham) elif ( self.Ham_type == "sparse" or self.Ham_type == "linear" and isinstance(n_eigs, int) ): logger.info("DIAGONALIZE %s HAMILTONIAN", self.Ham_type) energies, eigenvectors = sparse_eigh(self.Ham, k=self.n_eigs, which="SA") else: msg = ( "Unsupported diagonalization combination: " f"Ham_type={self.Ham_type}, n_eigs={n_eigs}" ) raise ValueError(msg) # Sort and save eigenvalues/eigenstates if not is_sorted(energies): order = np.argsort(energies) energies, eigenvectors = energies[order], eigenvectors[:, order] if print_results: # Save the eigenstates as QMB_states logger.info("TOT ENERGY -- EN DENSITY") for _, total_energy in enumerate(energies): en_density = total_energy / np.prod(self.lvals) logger.info("%.10f %.10f", total_energy, en_density) self.Nenergies = energies / np.prod(self.lvals) self.Npsi = [ QMB_state(eigenvectors[:, ii], self.lvals, self.loc_dims) for ii in range(self.n_eigs) ] # Save GROUND STATE PROPERTIES self.GSenergy = self.Nenergies[0] self.GSpsi = self.Npsi[0]
[docs] def time_evolution( self, initial_state: np.ndarray, time_line: np.ndarray, loc_dims: np.ndarray, ): """Evolve an initial state along a time grid using the current Hamiltonian. Parameters ---------- initial_state : numpy.ndarray Initial state vector. time_line : numpy.ndarray Time samples at which the evolved state is returned. loc_dims : numpy.ndarray Local Hilbert-space dimensions used to wrap evolved states into :class:`QMB_state` objects. Returns ------- None Evolved states are stored in ``self.psi_time``. """ # Save local dimensions self.loc_dims = loc_dims # Compute the time spacing and the time line delta_t = time_line[1] - time_line[0] msg = f"------------ TIME EVOLUTION: DELTA {round(delta_t,4)} ------------" logger.info(msg) # Check Hamiltonian type and compute the time evolution if self.Ham_type == "dense": # Check if Hamiltonian is already diagonalized if self.Nenergies is None or self.Npsi is None: self.diagonalize(n_eigs="full", format="dense", loc_dims=loc_dims) # Multiply energy densities by the number of sites to get total energies evals = self.Nenergies * np.prod(self.lvals) # Run the exact time evolution eigenstate_matrix = np.array([eigenstate.psi for eigenstate in self.Npsi]).T psi_time = exact_time_evolution( time_line, evals, eigenstate_matrix, initial_state.astype(np.complex128), ) elif self.Ham_type == "sparse": generator_matrix = complex(0, -1) * self.Ham psi_time = expm_multiply( A=generator_matrix, B=initial_state, start=time_line[0], stop=time_line[-1], num=len(time_line), endpoint=True, ) elif self.Ham_type == "linear": # Ensure traceA sees the complete set of accumulated triplets. self._materialize_triplets() def matvec_lin(psi): return -1j * self.Ham.matvec(psi) def matmat_lin(psis): return -1j * self.Ham.matmat(psis) def rmatvec_lin(psi): return +1j * self.Ham.matvec(psi) def rmatmat_lin(psis): return +1j * self.Ham.matmat(psis) A_lin = LinearOperator( shape=self.shape, matvec=matvec_lin, matmat=matmat_lin, rmatvec=rmatvec_lin, rmatmat=rmatmat_lin, dtype=np.complex128, ) trace_a = compute_trace_ih(self.row_list, self.col_list, self.value_list) psi_time = expm_multiply( A=A_lin, B=initial_state, start=time_line[0], stop=time_line[-1], num=len(time_line), endpoint=True, traceA=trace_a, ) logger.info("Saving time evolved states") # Save them as QMB_states self.psi_time = [ QMB_state(psi_time[ii, :], self.lvals, self.loc_dims) for ii in range(len(time_line)) ]
[docs] def partition_function(self, beta): """Compute the partition function for inverse temperature ``beta``. Parameters ---------- beta : float Inverse temperature. Returns ------- float Partition function ``Z``. Raises ------ ValueError If the computed partition function is non-positive. """ if self.n_eigs == self.Ham.shape[0]: partition_fn = np.sum(np.exp(-beta * self.Nenergies)) else: partition_fn = np.real(csc_matrix(expm(-beta * self.Ham)).trace()) if partition_fn <= 0: raise ValueError(f"Z must be positive, not {partition_fn}.") return partition_fn
[docs] def free_energy(self, beta): """Compute the free energy at inverse temperature ``beta``. Parameters ---------- beta : float Inverse temperature. Returns ------- float Free energy. """ partition_fn = self.partition_function(beta) free_energy = -1 / beta * np.log(partition_fn) return free_energy
[docs] def thermal_average(self, beta): """Compute the thermal average energy at inverse temperature ``beta``. Parameters ---------- beta : float Inverse temperature. Returns ------- float Thermal average energy. """ partition_fn = self.partition_function(beta) if self.n_eigs == self.Ham.shape[0]: return self.Nenergies.dot(np.exp(-beta * self.Nenergies)) / partition_fn return np.real(self.Ham.dot(expm(-beta * self.Ham)).trace()) / partition_fn
[docs] def f_prime(self, beta): """Compute the derivative used in the Newton solve for ``beta``. Parameters ---------- beta : float Inverse temperature. Returns ------- float Derivative-like quantity used by :meth:`get_beta`. """ partition_fn = self.partition_function(beta) if self.n_eigs == self.Ham.shape[0]: energy_sq_average = ( np.square(self.Nenergies).dot(np.exp(-beta * self.Nenergies)) / partition_fn ) else: energy_sq_average = np.real( (self.Ham**2).dot(expm(-beta * self.Ham)).trace() ) / partition_fn return -energy_sq_average - self.thermal_average(beta) ** 2
F_prime = f_prime # pylint: disable=invalid-name
[docs] def get_beta(self, state, threshold=1e-10, max_iter=1000): """Estimate an effective inverse temperature for a reference state. Parameters ---------- state : numpy.ndarray Reference state vector. threshold : float, optional Convergence tolerance for the Newton iteration. max_iter : int, optional Maximum number of Newton iterations. Returns ------- float Estimated inverse temperature ``beta``. Notes ----- The method solves for the root of the difference between the thermal average energy and the energy of ``state``. """ iter_count = 0 accuracy = 1 beta = 1e-7 logger.info("=========== GET BETA ===============") # Get the reference energy value for the chosen state state_energy = QMB_state(state).expectation_value(self.Ham) while accuracy > threshold and iter_count < max_iter: iter_count += 1 if self.n_eigs == self.Ham.shape[0]: partition_fn = self.partition_function(beta) energy_avg = ( self.Nenergies.dot(np.exp(-beta * self.Nenergies)) / partition_fn ) energy_delta = energy_avg - state_energy energy_sq_avg = ( np.square(self.Nenergies).dot(np.exp(-beta * self.Nenergies)) / partition_fn ) derivative_value = -energy_sq_avg - (energy_avg**2) else: thermal_kernel = csc_matrix(expm(-beta * self.Ham)) partition_fn = np.real(thermal_kernel.trace()) energy_avg = np.real(self.Ham.dot(thermal_kernel).trace()) / partition_fn energy_delta = energy_avg - state_energy derivative_value = ( -np.real((self.Ham**2).dot(thermal_kernel).trace()) / partition_fn - energy_avg**2 ) # ================================================== prev_val = beta beta = beta - energy_delta / derivative_value accuracy = abs(beta - prev_val) logger.info("------------------------------------") logger.info("F=%s", energy_delta) logger.info("Fp=%s", derivative_value) logger.info("beta=%s", beta) logger.info("accuracy=%s", accuracy) if iter_count >= max_iter: logger.warning("Maximum iterations reached without convergence.") logger.info("BETA %s", beta) return beta
[docs] def get_r_value(self): """Compute adjacent-gap ratio statistics from the stored spectrum. Returns ------- numpy.ndarray Array of local adjacent-gap ratios. Raises ------ ValueError If the Hamiltonian has not been fully diagonalized. """ # Check Hamiltonian type and compute the time evolution if self.Ham_type == "dense": # Check if Hamiltonian is already diagonalized if self.Nenergies is None or self.Npsi is None: raise ValueError("The Hamiltonian has to be fully diagonalized.") else: raise ValueError("The Hamiltonian has to be fully diagonalized.") # Compute level spacings delta_E = np.diff(np.sort(self.Nenergies)) # Compute r-values r_array = np.array( [ min(delta_E[ii], delta_E[ii + 1]) / max(delta_E[ii], delta_E[ii + 1]) for ii in range(len(delta_E) - 1) ] ) # Define the bulk range bulk_start = int(0.1 * len(r_array)) bulk_end = int(0.9 * len(r_array)) # Compute the average r-value for the bulk self.r_value = np.mean(r_array[bulk_start:bulk_end]) logger.info("R value %s", self.r_value) return r_array
[docs] def print_energy(self, en_state): """Log the energy density of a selected eigenstate index. Parameters ---------- en_state : int Eigenstate index in ``self.Nenergies``. """ logger.info("====================================================") logger.info("%s ENERGY: %.16f", en_state, self.Nenergies[en_state])
[docs] def get_sparsity(self): """Log the current sparsity estimated from accumulated triplets.""" self._materialize_triplets() sparsity = len(self.row_list) / (self.shape[0] ** 2) logger.info("SPARSITY: %.16f", sparsity)
def is_sorted(array1D): """Return ``True`` if a 1D array is non-decreasing.""" return np.all(array1D[:-1] <= array1D[1:]) def get_sorted_indices(data): """Return indices that sort entries by descending magnitude.""" abs_data = np.abs(data) real_data = np.real(data) # Lexsort by real part first (secondary key), then by absolute value (primary key) sorted_indices = np.lexsort((real_data, abs_data)) return sorted_indices[::-1] # Descending order
[docs] def get_entropy_partition(lvals, option="half"): """Return lattice-site indices used for a standard bipartition. Parameters ---------- lvals : list or tuple Lattice dimensions. option : str, optional Partition preset. Currently only ``"half"`` is implemented. Returns ------- list[int] Site indices belonging to the selected subsystem. Raises ------ NotImplementedError If ``option`` is not implemented. """ if option == "half": if len(lvals) == 1: partition_indices = list(np.arange(0, int(lvals[0] / 2), 1)) else: partition_indices = list(np.arange(0, int(lvals[0] / 2), 1)) + list( np.arange(lvals[0], lvals[0] + int(lvals[0] / 2), 1) ) else: raise NotImplementedError("for the moment it works only for option=half") return partition_indices
@njit(cache=True) def build_csr_numba(matrix_dim, row_list, col_list, data_list): """Build CSR arrays from COO-style triplets. Parameters ---------- matrix_dim : int Matrix dimension (square matrix). row_list, col_list, data_list : numpy.ndarray COO-style nonzero entries. Returns ------- tuple ``(indptr, indices, data)`` CSR arrays. """ nnz = row_list.shape[0] indptr = np.zeros(matrix_dim + 1, np.int32) # 1) count nonzeros per row for idx in range(nnz): indptr[row_list[idx] + 1] += 1 # 2) prefix-sum for ii in range(1, matrix_dim + 1): indptr[ii] += indptr[ii - 1] # 3) scatter into CSR indices = np.empty(nnz, np.int32) data = np.empty(nnz, data_list.dtype) next_position = indptr[:-1].copy() for idx in range(nnz): row_idx = row_list[idx] write_idx = next_position[row_idx] indices[write_idx] = col_list[idx] data[write_idx] = data_list[idx] next_position[row_idx] += 1 return indptr, indices, data @njit(cache=True, parallel=True) def csr_spmv( indptr: np.ndarray, indices: np.ndarray, data: np.ndarray, input_vector: np.ndarray, ) -> np.ndarray: """Multiply a CSR matrix by a vector. Parameters ---------- indptr, indices, data : numpy.ndarray CSR arrays. input_vector : numpy.ndarray Input vector. Returns ------- numpy.ndarray Product ``H @ x``. """ matrix_dim = indptr.shape[0] - 1 h_psi = np.zeros(matrix_dim, dtype=np.complex128) for ii in prange(matrix_dim): value = 0.0 + 0j for pp in range(indptr[ii], indptr[ii + 1]): value += data[pp] * input_vector[indices[pp]] h_psi[ii] = value return h_psi @njit(cache=True, parallel=True) def csr_matmat( indptr: np.ndarray, indices: np.ndarray, data: np.ndarray, input_matrix: np.ndarray, ) -> np.ndarray: """Multiply a CSR matrix by a dense matrix (multiple vectors). Parameters ---------- indptr, indices, data : numpy.ndarray CSR arrays. input_matrix : numpy.ndarray Dense matrix whose columns are vectors. Returns ------- numpy.ndarray Product ``H @ input_matrix``. """ matrix_dim = indptr.shape[0] - 1 n_vectors = input_matrix.shape[1] output_matrix = np.zeros((matrix_dim, n_vectors), dtype=np.complex128) # outer loop parallelized over the matrix rows for ii in prange(matrix_dim): # for each nonzero in row i for pp in range(indptr[ii], indptr[ii + 1]): col_idx = indices[pp] # column index value = data[pp] # value H[i,j] # accumulate v * input_matrix[j, :] into output_matrix[i, :] for kk in range(n_vectors): output_matrix[ii, kk] += value * input_matrix[col_idx, kk] return output_matrix @njit(cache=True) def compute_trace_ih( row_list: np.ndarray, col_list: np.ndarray, data_list: np.ndarray ) -> complex: """Compute ``trace(-i H)`` directly from triplet data. Parameters ---------- row_list, col_list, data_list : numpy.ndarray COO-style nonzero entries of ``H``. Returns ------- complex ``trace(-i H)``. """ trace_value = 0.0 + 0.0j nnz = row_list.shape[0] for idx in range(nnz): if row_list[idx] == col_list[idx]: trace_value += -1j * data_list[idx] return trace_value compute_trace_iH = compute_trace_ih # pylint: disable=invalid-name @njit(parallel=True, cache=True) def exact_time_evolution( time_line: np.ndarray, eigenvalues: np.ndarray, eigenvectors: np.ndarray, initial_state: np.ndarray, ): """Time evolve a state from a full eigendecomposition. Parameters ---------- time_line : numpy.ndarray Time samples. eigenvalues : numpy.ndarray Hamiltonian eigenvalues. eigenvectors : numpy.ndarray Hamiltonian eigenvectors stored by columns. initial_state : numpy.ndarray Initial state vector. Returns ------- numpy.ndarray Time-evolved states with shape ``(n_steps, psi_dim)``. """ psi_dim = len(initial_state) n_eigs = len(eigenvalues) # To store time-evolved states n_steps = len(time_line) psi_time = np.empty((n_steps, psi_dim), np.complex128) # Precompute overlaps <E_i|psi(0)> for all i overlaps = compute_overlap_with_eigenstates(eigenvectors, initial_state) # Loop over time steps in parallel for t_idx in prange(n_steps): time_value = time_line[t_idx] evolved_state = np.zeros(psi_dim, dtype=np.complex128) # Loop over eigenstates for ii in range(n_eigs): # Compute phase factor phase = np.exp(-1j * eigenvalues[ii] * time_value) # Accumulate contributions evolved_state += phase * overlaps[ii] * eigenvectors[:, ii] psi_time[t_idx, :] = evolved_state return psi_time @njit(cache=True, parallel=True) def compute_overlap_with_eigenstates(eigenvectors: np.ndarray, state: np.ndarray): """Compute overlaps between a state and an eigenbasis. Parameters ---------- eigenvectors : numpy.ndarray Eigenvectors stored by columns. state : numpy.ndarray State vector. Returns ------- numpy.ndarray Overlaps ``<E_i|state>``. """ psi_dim = eigenvectors.shape[0] n_eigs = eigenvectors.shape[1] overlaps = np.empty(n_eigs, np.complex128) for ii in prange(n_eigs): acc = 0.0 + 0.0j for jj in range(psi_dim): acc += np.conj(eigenvectors[jj, ii]) * state[jj] overlaps[ii] = acc return overlaps @njit(parallel=True) def csr_spmv_lin( indptr: np.ndarray, indices: np.ndarray, data: np.ndarray, input_vector: np.ndarray, output_buffer: np.ndarray, ): """Compute ``out = -1j * H @ x`` for a CSR matrix ``H``. Parameters ---------- indptr, indices, data : numpy.ndarray CSR arrays defining ``H``. input_vector : numpy.ndarray Input vector. output_buffer : numpy.ndarray Output buffer written in place. """ matrix_dim = indptr.shape[0] - 1 for ii in prange(matrix_dim): value = 0.0 + 0.0j for pp in range(indptr[ii], indptr[ii + 1]): value += data[pp] * input_vector[indices[pp]] output_buffer[ii] = -1j * value @njit(parallel=True, cache=True) def runge_kutta4_time_evolution( initial_state: np.ndarray, indptr: np.ndarray, indices: np.ndarray, data: np.ndarray, time_line: np.ndarray, ) -> np.ndarray: """Integrate ``dpsi/dt = -i H psi`` with a parallel RK4 scheme. Parameters ---------- initial_state : numpy.ndarray Initial state vector. indptr, indices, data : numpy.ndarray CSR arrays defining ``H``. time_line : numpy.ndarray Time grid. Returns ------- numpy.ndarray Array of states with shape ``(len(time_line), state_dim)``. """ state_dim = initial_state.shape[0] steps = len(time_line) dt = time_line[1] - time_line[0] # allocate output and temporaries out = np.empty((steps, state_dim), np.complex128) # Ensure psi is complex128. psi = np.empty(state_dim, np.complex128) for ii in prange(state_dim): psi[ii] = initial_state[ii] # copies and *converts* into a complex slot out[0, ii] = initial_state[ii] k1 = np.empty(state_dim, np.complex128) k2 = np.empty(state_dim, np.complex128) k3 = np.empty(state_dim, np.complex128) k4 = np.empty(state_dim, np.complex128) tmp = np.empty(state_dim, np.complex128) for step_idx in range(1, steps): # k1 = -i H @ psi csr_spmv_lin(indptr, indices, data, psi, k1) # k2 = -i H @ (psi + dt/2*k1) for ii in prange(state_dim): tmp[ii] = psi[ii] + 0.5 * dt * k1[ii] csr_spmv_lin(indptr, indices, data, tmp, k2) # k3 = -i H @ (psi + dt/2*k2) for ii in prange(state_dim): tmp[ii] = psi[ii] + 0.5 * dt * k2[ii] csr_spmv_lin(indptr, indices, data, tmp, k3) # k4 = -i H @ (psi + dt*k3) for ii in prange(state_dim): tmp[ii] = psi[ii] + dt * k3[ii] csr_spmv_lin(indptr, indices, data, tmp, k4) # combine to advance psi for ii in prange(state_dim): psi[ii] += dt * (k1[ii] + 2 * k2[ii] + 2 * k3[ii] + k4[ii]) / 6.0 # renormalize to unit norm nrm = get_norm(psi) for ii in prange(state_dim): psi[ii] /= nrm out[step_idx] = psi return out