Source code for edlgt.models.quantum_model

"""High-level quantum lattice model workflow utilities.

This module defines :class:`QuantumModel`, a convenience class that combines
local operator projection, symmetry-sector construction, Hamiltonian assembly,
diagonalization, dynamics, and observable measurements for lattice gauge and
spin models.
"""

import logging
import math
from math import prod

from numba import typed
import numpy as np
from scipy.sparse import csc_matrix, csr_matrix, identity, isspmatrix
from scipy.sparse.linalg import expm, norm

from edlgt.dtype_config import (
    coerce_numeric_array,
    get_default_dtype_mode,
    get_numeric_dtype,
    set_default_dtype_mode,
)
from edlgt.entanglement.partitions import build_partition_metadata
from edlgt.modeling import (
    LocalTerm,
    NBodyTerm,
    PlaquetteTerm,
    QMB_hamiltonian,
    QMB_state,
    TwoBodyTerm,
    get_lattice_link_site_pairs,
    get_neighbor_sites,
    lattice_base_configs,
    staggered_mask,
    zig_zag,
)
from edlgt.symmetries import (
    get_link_sector_configs,
    get_momentum_basis,
    get_symmetry_sector_generators,
    global_abelian_sector,
    symmetry_sector_configs,
)
from edlgt.tools.config_encoding import config_to_index, config_to_index_binarysearch

logger = logging.getLogger(__name__)
__all__ = ["QuantumModel", "format_loc_dims", "validate_staggered_matter_lattice"]


[docs] class QuantumModel: """High-level container orchestrating model building and measurements.""" def __init__( self, lvals: list[int], has_obc: list[bool], ham_format="sparse", basis_projector: np.ndarray | None = None, ): """Initialize a lattice quantum model container. Parameters ---------- lvals : list Lattice dimensions. has_obc : list Boundary-condition flags per axis (``True`` for open boundaries). ham_format : str, optional Preferred Hamiltonian representation (for example ``"sparse"``). basis_projector : numpy.ndarray, optional Optional site-local projector used to reduce the effective local Hilbert space uniformly on all sites. """ # Lattice parameters self.lvals = lvals self.dim = len(self.lvals) self.directions = "xyz"[: self.dim] self.n_sites = prod(self.lvals) self.has_obc = has_obc # Symmetry sector configurations self.sector_configs = None # Gauge Basis self.gauge_basis = None self.gauge_states = None # Staggered Basis self.staggered_basis = False # Momentum Basis self.momentum_basis = None # Hamiltonian format self.ham_format = ham_format # Efficient reduced basis projector self.basis_projector = basis_projector if basis_projector is not None: logger.info("Efficient basis projector: %s", basis_projector.shape) # Dictionary for system partition # A cache keyed by each bipartition (keep-indices tuple), storing everything you need for that cut. self._partition_cache: dict[tuple[int, ...], dict] = {} # Dictionary for results self.res = {}
[docs] def default_params(self): """Initialize default keyword arguments and the Hamiltonian container. Returns ------- None Updates ``self.def_params`` and (when possible) creates ``self.H``. """ if self.momentum_basis is not None and self.sector_configs is not None: pair_mode = self.momentum_basis.get("pair_mode", False) if pair_mode: if self.momentum_basis["k_left"] != self.momentum_basis["k_right"]: # rectangular H(k1,k2); can't create a square H hamiltonian_size = None else: hamiltonian_size = self.momentum_basis["n_cols_L"] self.sector_dim = hamiltonian_size else: hamiltonian_size = self.momentum_basis["n_cols"] self.sector_dim = hamiltonian_size elif self.sector_configs is not None: hamiltonian_size = self.sector_configs.shape[0] self.sector_dim = hamiltonian_size else: hamiltonian_size = np.prod(self.loc_dims) self.sector_dim = hamiltonian_size # Define the default parameters as a dictionary self.def_params = { "lvals": self.lvals, "has_obc": self.has_obc, "loc_dims": self.loc_dims, "staggered_basis": self.staggered_basis, "gauge_basis": self.gauge_basis, "sector_configs": self.sector_configs, "momentum_basis": self.momentum_basis, } if hamiltonian_size is not None: # Initialize the Hamiltonian self.H = QMB_hamiltonian(self.lvals, size=hamiltonian_size) else: self.H = None
[docs] def set_momentum_sector( self, k_unit_cell_size: list[int], k_vals: list[int], TC_symmetry: bool = False, ): """Build and store a momentum-sector projector. Parameters ---------- k_unit_cell_size : list Translation unit-cell size used to define momentum sectors. k_vals : list Target momentum quantum numbers (one per lattice dimension). TC_symmetry : bool, optional Whether to include translation-charge symmetry reduction in the momentum basis construction. Returns ------- None Raises ------ ValueError If symmetry-sector configurations are missing, OBC are present, or the momentum shape is inconsistent with the lattice dimension. """ if self.sector_configs is None: raise ValueError("symmetry sector_configs not defined yet") if self.has_obc[0]: raise ValueError("Momentum is not conserved with OBC.") if np.ndim(k_vals) == 0: k_vals = np.array([int(k_vals)], dtype=np.int32) else: k_vals = np.ascontiguousarray(k_vals, dtype=np.int32) if k_vals.size != self.dim: raise ValueError(f"expected k_vals shape {self.dim}, got {k_vals.shape}") k_unit_cell_size = np.ascontiguousarray(k_unit_cell_size, dtype=np.int32) k_vals = np.ascontiguousarray(k_vals, dtype=np.int32) logger.info("====================================================") logger.info("Momentum sector %s", k_vals) L_col_ptr, L_row_idx, L_data, R_row_ptr, R_col_idx, R_data = get_momentum_basis( sector_configs=self.sector_configs, lvals=self.lvals, unit_cell_size=k_unit_cell_size, k_vals=k_vals, TC_symmetry=TC_symmetry, ) n_rows = self.sector_configs.shape[0] n_cols = L_col_ptr.shape[0] - 1 # store for later use in projectors self.momentum_basis = { "pair_mode": False, "n_rows": n_rows, "n_cols": n_cols, "L_col_ptr": L_col_ptr, "L_row_idx": L_row_idx, "L_data": L_data, "R_row_ptr": R_row_ptr, "R_col_idx": R_col_idx, "R_data": R_data, } logger.info("Momentum basis shape (%s,%s)", n_rows, n_cols)
[docs] def set_momentum_pair( self, k_left: list[int], k_right: list[int], k_unit_cell_size: list[int], TC_symmetry: bool, ): """Build two momentum projectors for rectangular ``k_L``-``k_R`` blocks. Parameters ---------- k_left, k_right : list Left and right momentum quantum numbers. k_unit_cell_size : list Translation unit-cell size. TC_symmetry : bool Whether to include translation-charge symmetry reduction. Returns ------- None Notes ----- This prepares rectangular projections of the form ``P_kL^dagger O P_kR`` and stores them in ``self.momentum_basis`` in pair mode. """ if self.sector_configs is None: raise ValueError("symmetry sector_configs not defined yet") if self.has_obc[0]: raise ValueError("Momentum is not conserved with OBC.") k_unit_cell_size = np.ascontiguousarray(k_unit_cell_size, dtype=np.int32) k_left = np.ascontiguousarray(k_left, dtype=np.int32) k_right = np.ascontiguousarray(k_right, dtype=np.int32) logger.info("====================================================") logger.info("Combined Momenta %s %s", k_left, k_right) # Left momentum kL L_col_ptr, L_row_idx, L_data, _, _, _ = get_momentum_basis( sector_configs=self.sector_configs, lvals=self.lvals, unit_cell_size=k_unit_cell_size, k_vals=k_left, TC_symmetry=TC_symmetry, ) n_cols_L = L_col_ptr.shape[0] - 1 # Right momentum kR _, _, _, R_row_ptr, R_col_idx, R_data = get_momentum_basis( sector_configs=self.sector_configs, lvals=self.lvals, unit_cell_size=k_unit_cell_size, k_vals=k_right, TC_symmetry=TC_symmetry, ) n_cols_R = int(R_col_idx.max()) + 1 # Save the momentum basis self.momentum_basis = { "pair_mode": True, "n_rows_full": self.sector_configs.shape[0], "n_cols_L": n_cols_L, "n_cols_R": n_cols_R, "L_col_ptr": L_col_ptr, "L_row_idx": L_row_idx, "L_data": L_data, "R_row_ptr": R_row_ptr, "R_col_idx": R_col_idx, "R_data": R_data, "k_left": k_left, "k_right": k_right, } msg = f"Momenta kL={k_left} (cols={n_cols_L}), kR={k_right} (cols={n_cols_R})" logger.info(msg)
[docs] def check_momentum_pair(self): """Validate orthonormality/orthogonality of the stored momentum pair basis. Returns ------- None Raises ------ ValueError If either projector is not orthonormal or two distinct momentum sectors are not orthogonal. """ N = self.sector_configs.shape[0] B_L = self.momentum_basis["L_col_ptr"].shape[0] - 1 B_R = int(self.momentum_basis["R_col_idx"].max()) + 1 P_L = csc_matrix( ( self.momentum_basis["L_data"], self.momentum_basis["L_row_idx"], self.momentum_basis["L_col_ptr"], ), shape=(N, B_L), ) P_R = csr_matrix( ( self.momentum_basis["R_data"], self.momentum_basis["R_col_idx"], self.momentum_basis["R_row_ptr"], ), shape=(N, B_R), ) G_L = P_L.conj().T @ P_L G_R = P_R.conj().T @ P_R MIX = P_L.conj().T @ P_R normL2 = norm(G_L - identity(B_L)) normR2 = norm(G_R - identity(B_R)) normMIX = norm(MIX) logger.info("norm: (PL^dag @ PL) -1: %s", normL2) logger.info("norm: (PR^dag @ PR) -1: %s", normR2) logger.info("norm: (PL^dag @ PR): %s", normMIX) if normL2 > 1e-12: raise ValueError("PL is not a projector") if normR2 > 1e-12: raise ValueError("RL is not a projector") k1 = self.momentum_basis["k_left"] k2 = self.momentum_basis["k_right"] if (not np.array_equal(k1, k2)) and normMIX > 1e-12: raise ValueError("The two basis are not orthogonal")
[docs] def project_operators(self, ops_dict: dict[str, csr_matrix], bg_sector_list=None): """Project local operators into the effective per-site basis. Parameters ---------- ops_dict : dict Dictionary of bare local operators. bg_sector_list : list, optional Optional per-site background-sector labels used to restrict the gauge-invariant basis on each site. Returns ------- None Stores projected operators in ``self.ops`` and local dimensions in ``self.loc_dims``. """ if self.gauge_basis is not None: lattice_labels, loc_dims = lattice_base_configs( self.gauge_basis, self.lvals, self.has_obc, self.staggered_basis ) loc_dims = loc_dims.transpose().reshape(self.n_sites) self.lattice_labels = lattice_labels.transpose().reshape(self.n_sites) else: local_dim = ops_dict[list(ops_dict.keys())[0]].shape[0] loc_dims = np.array([local_dim] * self.n_sites, dtype=int) self.lattice_labels = None # If a global basis_projector is used, keep old behavior (uniform dim) # Determine effective local dimension # NOTE: we assume that the basis projector is the same for all sites # This will be eventually generalized if self.basis_projector is not None: local_dim = self.basis_projector.shape[1] self.loc_dims = np.array([local_dim] * self.n_sites, dtype=int) # background slicing would need to apply after this projector bg_sector_list = None else: self.loc_dims = loc_dims.copy() # ------------------------------------------------------------------------- self.gauge_states_per_site = [] gauge_basis = {} gauge_states = {} projector_per_site = None if self.gauge_basis is not None: if self.gauge_states is None: raise ValueError("gauge_states must be defined when gauge_basis is set") projector_per_site = [None] * self.n_sites gauge_basis = self.gauge_basis gauge_states = self.gauge_states # build per-site background-resolved projectors/states (if requested) if (bg_sector_list is not None) and (self.gauge_basis is not None): if len(bg_sector_list) != self.n_sites: raise ValueError("bg_sector_list must have length self.n_sites") # For each site select the exact GI basis block labeled by # (background sector, lattice label). for ii in range(self.n_sites): site_label = self.lattice_labels[ii] bg_value = bg_sector_list[ii] exact_key = (bg_value, site_label) if exact_key not in gauge_basis: msg = f"No GI basis for site {ii} label={site_label}, bg={bg_value}" raise ValueError(msg) projector = gauge_basis[exact_key] if projector.shape[1] == 0: msg = f"No GI states at site {ii} label={site_label} with bg {bg_value}" raise ValueError(msg) projector_per_site[ii] = projector self.loc_dims[ii] = projector.shape[1] self.gauge_states_per_site.append(gauge_states[exact_key]) elif (bg_sector_list is None) and (self.gauge_basis is not None): for ii in range(self.n_sites): site_label = self.lattice_labels[ii] projector_per_site[ii] = gauge_basis[site_label] self.gauge_states_per_site.append(gauge_states[site_label]) logger.info("====================================================") logger.info("LOCAL DIMENSION per SITE") format_loc_dims(self.loc_dims, self.lvals) # Determine the maximum local dimension max_loc_dim = int(np.max(self.loc_dims)) # ----------------------------------------------------------------------------- # Initialize new dictionary with the projected operators logger.debug("Projecting operators to the effective Hilbert space") self.ops = {} # Iterate over operators for op, operator in ops_dict.items(): op_shape = (self.n_sites, max_loc_dim, max_loc_dim) self.ops[op] = np.zeros(op_shape, dtype=ops_dict[op].dtype) # Run over the sites for jj, loc_dim in enumerate(self.loc_dims): # For Lattice Gauge Theories where sites have different Hilbert Bases if projector_per_site is not None: projector = projector_per_site[jj] eff_op = apply_projection( projector=projector, operator=operator ).toarray() # For Theories where all the sites have the same Hilber basis else: eff_op = operator.toarray() # If an extra projector is given (to reduce the local Hilbert space) if self.basis_projector is not None: eff_op = apply_projection( projector=self.basis_projector, operator=eff_op ) # Save it inside the new list of operators self.ops[op][jj, :loc_dim, :loc_dim] = eff_op
[docs] def get_abelian_symmetry_sector( self, global_ops: list[np.ndarray] | None, global_sectors: list | None = None, global_sym_type: str = "U", link_ops: list[list[np.ndarray]] | None = None, link_sectors: list | None = None, nbody_ops: list[np.ndarray] | None = None, nbody_sectors: list | None = None, nbody_sites_list=None, nbody_sym_type: str | None = None, ): """Build symmetry-sector configurations from abelian constraints. Parameters ---------- global_ops : list or None Generators for global abelian symmetries. global_sectors : list or None, optional Target sectors for global symmetries. global_sym_type : str, optional Global symmetry type flag passed to symmetry-sector routines. link_ops : list or None Link-symmetry generators. link_sectors : list or None, optional Target sectors for link symmetries. nbody_ops : list or None Optional n-body symmetry generators. nbody_sectors : list or None, optional Target sectors for n-body symmetries. nbody_sites_list : object, optional Site patterns for n-body symmetries. nbody_sym_type : str or None, optional Symmetry type flag for n-body constraints. Notes ----- The method supports pure global sectors, pure link sectors, mixed link-plus-nbody sectors, and pure n-body sectors. Returns ------- None Stores the resulting sector configurations in ``self.sector_configs``. """ logger.info("----------------------------------------------------") # ================================================================================ # GLOBAL ABELIAN SYMMETRIES global_ops_diags = None link_ops_diags = None pair_list = None if global_ops is not None: logger.info("Global Symmetry operators") global_ops_diags = get_symmetry_sector_generators( global_ops, action="global" ) # ================================================================================ # ABELIAN Z2 SYMMETRIES if link_ops is not None: logger.info("Link Symmetry operators") link_ops_diags = get_symmetry_sector_generators(link_ops, action="link") pair_list = get_lattice_link_site_pairs(self.lvals, self.has_obc) # ================================================================================ # nBODY ABELIAN SYMMETRIES if nbody_ops is not None: logger.info("Nbody Symmetry operators") nbody_ops_diags = get_symmetry_sector_generators(nbody_ops, action="nbody") else: nbody_ops_diags = None # ================================================================================ if global_ops is not None and link_ops is not None: logger.info("Global & Link Symmetry sector") self.sector_configs = symmetry_sector_configs( loc_dims=self.loc_dims, glob_op_diags=global_ops_diags, glob_sectors=np.array(global_sectors, dtype=float), sym_type_flag=global_sym_type, link_op_diags=link_ops_diags, link_sectors=link_sectors, pair_list=pair_list, ) elif global_ops is not None: logger.info("Global Symmetry sector") self.sector_configs = global_abelian_sector( loc_dims=self.loc_dims, sym_op_diags=global_ops_diags, sym_sectors=np.array(global_sectors, dtype=float), sym_type=global_sym_type, ) elif link_ops is not None: if nbody_ops is not None: logger.info("Link & Nbody Symmetry sector") else: logger.info("Link Symmetry sector") self.sector_configs = get_link_sector_configs( loc_dims=self.loc_dims, link_op_diags=link_ops_diags, link_sectors=link_sectors, pair_list=pair_list, nbody_op_diags=nbody_ops_diags, nbody_sectors=nbody_sectors, nbody_sites_list=nbody_sites_list, nbody_sym_type=nbody_sym_type, ) elif nbody_ops is not None: logger.info("Nbody Symmetry sector") # Reuse the existing link+nbody iterative builder with a formally # empty link sector. This keeps the constrained-basis logic in one # place and avoids introducing a second implementation path just for # pure n-body constraints such as Rydberg blockade sectors. max_loc_dim = int(np.max(self.loc_dims)) empty_link_op_diags = np.zeros( (1, 2, self.n_sites, max_loc_dim), dtype=float ) empty_link_sectors = np.zeros(1, dtype=float) empty_pair_list = typed.List() # Numba needs a typed container here, so we seed it with one empty # pair table instead of passing a bare Python empty list. empty_pair_list.append(np.empty((0, 2), dtype=np.int32)) self.sector_configs = get_link_sector_configs( loc_dims=self.loc_dims, link_op_diags=empty_link_op_diags, link_sectors=empty_link_sectors, pair_list=empty_pair_list, nbody_op_diags=nbody_ops_diags, nbody_sectors=nbody_sectors, nbody_sites_list=nbody_sites_list, nbody_sym_type=nbody_sym_type, ) else: logger.info("NO SYMMETRY DETECTED: no sector_configs") tot_dim = math.prod(int(d) for d in self.loc_dims) bits = sum(math.log2(d) for d in self.loc_dims) logger.info("TOT DIM: %s, 2^%.3f", tot_dim, bits)
[docs] def diagonalize_Hamiltonian( self, n_eigs, ham_format=None, print_results=False, **kwargs ): """Diagonalize the model Hamiltonian and cache energies/eigenstates. Accepts the legacy ``format=...`` keyword for backward compatibility. """ legacy_format = kwargs.pop("format", None) if kwargs: unexpected = ", ".join(sorted(kwargs)) raise TypeError(f"unexpected keyword argument(s): {unexpected}") if ham_format is None: ham_format = legacy_format elif legacy_format is not None and ham_format != legacy_format: raise ValueError( "Received conflicting Hamiltonian formats via ham_format and format" ) if ham_format is None: ham_format = getattr(self, "ham_format", None) if ham_format is None: raise TypeError("ham_format must be provided") # DIAGONALIZE THE HAMILTONIAN self.H.diagonalize(n_eigs, ham_format, self.loc_dims, print_results) self.n_eigs = self.H.n_eigs self.res["energy"] = self.H.Nenergies
[docs] def time_evolution_Hamiltonian(self, initial_state, time_line): """Evolve an initial state with the current Hamiltonian.""" self.H.time_evolution(initial_state, time_line, self.loc_dims)
# ---- Momentum-basis wrappers (no copies) ---- def _basis_Pk_as_csc(self): """Return B as a SciPy CSC matrix using the stored arrays (no copy).""" if self.momentum_basis is None: return None mb = self.momentum_basis shape = (mb["n_rows"], mb["n_cols"]) return csc_matrix((mb["L_data"], mb["L_row_idx"], mb["L_col_ptr"]), shape=shape) def _basis_Pk_as_csr(self): """Return B as a SciPy CSR matrix using the stored arrays (no copy).""" if self.momentum_basis is None: return None mb = self.momentum_basis shape = (mb["n_rows"], mb["n_cols"]) return csr_matrix((mb["R_data"], mb["R_col_idx"], mb["R_row_ptr"]), shape=shape) def _project_state_with_basis(self, state_realspace): """Project a state into the stored momentum basis. Parameters ---------- state_realspace : numpy.ndarray State coefficients in the symmetry-sector basis. Returns ------- numpy.ndarray Momentum-basis coefficients (complex128). """ if self.momentum_basis is None: # no momentum sector → identity map return state_realspace.astype(np.complex128, copy=False) mb = self.momentum_basis n_rows = mb["n_rows"] n_cols = mb["n_cols"] row_ptr = mb["R_row_ptr"] col_idx = mb["R_col_idx"] data = mb["R_data"] psi_out = np.zeros(n_cols, dtype=np.complex128) x = state_realspace # (N,) # Accumulate: psi_out[j] += conj(B[r,j]) * x[r] for r in range(n_rows): xr = x[r] if xr == 0: # tiny micro-optimization continue start = row_ptr[r] stop = row_ptr[r + 1] for p in range(start, stop): j = col_idx[p] v = data[p] psi_out[j] += np.conj(v) * xr return psi_out
[docs] def momentum_basis_projection(self, operator): """Project an operator into the currently stored momentum basis. Parameters ---------- operator : str or numpy.ndarray or scipy.sparse.spmatrix Operator to project. If ``"H"``, projects ``self.H.Ham`` in place. Returns ------- scipy.sparse.csr_matrix or None Projected operator. Returns ``None`` when projecting ``"H"`` in place. Raises ------ ValueError If no momentum basis has been set. """ if self.momentum_basis is None: raise ValueError("Basis projector B is not set.") B_csc = self._basis_Pk_as_csc() # 1) pick A if isinstance(operator, str) and operator == "H": A = self.H.Ham else: A = operator # Coerce to sparse if not isspmatrix(A): A = csr_matrix(A) else: A = A.tocsr() # good for left-multiply # 2) Y = A @ B (N × M), keep sparse Y = A @ B_csc # 3) A' = B^† @ Y = (B_csc.conj().T) @ Y (M × M) A_proj = (B_csc.conj().T) @ Y # returns sparse # If projecting H, store it back in your Hamiltonian container if isinstance(operator, str) and operator == "H": self.H.Ham = A_proj.tocsr() else: return A_proj.tocsr()
def _get_partition(self, keep_indices): """Build and cache metadata for a subsystem/environment bipartition. Parameters ---------- keep_indices : list or tuple Lattice-site indices kept in the subsystem. Returns ------- dict Cached partition metadata. For symmetry-sector calculations this includes subsystem/environment configuration maps and unique configurations; otherwise only dimensions and index lists are stored. """ key = tuple(sorted(keep_indices)) logger.info("----------------------------------------------------") logger.info("Bipartite the system: SUBSYS %s", keep_indices) if key not in self._partition_cache: # Determine the environmental indices env_indices = [ii for ii in range(self.n_sites) if ii not in keep_indices] env_indices = np.array(env_indices) keep_indices = np.array(keep_indices) # --------------------------------------------------------------------------------- # Distinguish between the case of symmetry sector and the standard case if self.sector_configs is not None: # SYMMETRY SECTOR # Delegate the partition bookkeeping to the dedicated # entanglement helper so the model class stays focused on # orchestration rather than row-mapping details. partition_data = build_partition_metadata( self.sector_configs, self.loc_dims, keep_indices, env_indices, ) else: # NO SYMMETRY SECTOR # Determine the dimensions of the subsystem and environment for the bipartition subsys_dim = np.prod([self.loc_dims[ii] for ii in keep_indices]) env_dim = np.prod([self.loc_dims[ii] for ii in env_indices]) # --------------------------------------------------------------------------------- # Save the partition information if self.sector_configs is not None: self._partition_cache[key] = partition_data else: self._partition_cache[key] = { "subsys_indices": keep_indices, "env_indices": env_indices, "subsys_dim": subsys_dim, "env_dim": env_dim, } return self._partition_cache[key]
[docs] def build_projector_from_sector_to_fullspace( self, indices: list[int] ) -> csc_matrix: """Build a projector from a symmetry-reduced subsystem basis to the full basis. Parameters ---------- indices : list Subsystem site indices. Returns ------- scipy.sparse.csc_matrix Column projector from the symmetry-sector subsystem basis to the full subsystem computational basis. """ unique_subsys_configs = self._get_partition(indices)["unique_subsys_configs"] subsys_dim = unique_subsys_configs.shape[0] subsys_loc_dims = self.loc_dims[indices] tot_subsys_dim = np.prod(subsys_loc_dims) # Build entries of the projector rows = np.empty(subsys_dim, dtype=np.int64) cols = np.arange(subsys_dim, dtype=np.int64) data = np.ones(subsys_dim, dtype=np.float64) # Run over the subsystem_configs for idx in range(subsys_dim): rows[idx] = config_to_index( config=unique_subsys_configs[idx], loc_dims=subsys_loc_dims ) # Build the project as a sparse column matrix P = csc_matrix((data, (rows, cols)), shape=(tot_subsys_dim, subsys_dim)) return P
[docs] def get_qmb_state_from_configs(self, configs): """Build an equal-weight state from explicit sector configurations. Parameters ---------- configs : sequence or list Collection of basis configurations compatible with ``self.sector_configs``. Returns ------- numpy.ndarray Normalized state vector in the symmetry-sector basis. Raises ------ NotImplementedError If a momentum sector is active. ValueError If one configuration is not compatible with the current symmetry sector. """ if self.momentum_basis is not None: raise NotImplementedError("cannot get state configs within momentum sector") # INITIALIZE the STATE state = np.zeros(len(self.sector_configs), dtype=np.complex128) # Get the corresponding QMB index of each config logger.info("----------------------------------------------------") logger.info("GET QMB STATE FROM CONFIGS") for config in configs: index = config_to_index_binarysearch(config, self.sector_configs) if index < 0: raise ValueError(f"config not compatible with the symmetry sector") cfg = " ".join(f"{c:>2d}" for c in config) logger.info("cfg [%s] in state %s", cfg, index) state[index] = complex(1 / np.sqrt(len(configs)), 0) return state
def _get_measurement_state(self, state=None, state_index=None, dynamics=False): """Return the state object used by post-processing measurements. Parameters ---------- state : QMB_state, optional Explicit state to measure. When provided, it takes precedence over ``state_index`` and ``dynamics``. state_index : int, optional Index of the eigenstate or time-evolved state to measure. dynamics : bool, optional If ``True``, read from ``self.H.psi_time``. Otherwise read from ``self.H.Npsi``. Returns ------- QMB_state State selected for the measurement. """ if state is not None: if not isinstance(state, QMB_state): raise TypeError(f"state must be QMB_state, not {type(state)}") return state if not hasattr(self, "H"): raise ValueError("Hamiltonian container is not available.") state_attr = "psi_time" if dynamics else "Npsi" if not hasattr(self.H, state_attr): raise ValueError(f"Hamiltonian does not provide '{state_attr}' states.") state_list = getattr(self.H, state_attr) if state_index is None: if len(state_list) == 1: return state_list[0] raise ValueError( "state_index is required when more than one state is available." ) return state_list[state_index]
[docs] def measure_fidelity( self, state: np.ndarray, index: int, dynamics: bool = False, print_value: bool = False, ): """Compute fidelity with an eigenstate or a time-evolved state. Parameters ---------- state : numpy.ndarray Reference state vector. index : int Index of the comparison state. dynamics : bool, optional If ``True``, compare against ``self.H.psi_time[index]`` instead of ``self.H.Npsi[index]``. print_value : bool, optional If ``True``, log the fidelity value. Returns ------- float Fidelity ``|<ref|state>|^2``. """ if not isinstance(state, np.ndarray): raise TypeError(f"state must be np.array not {type(state)}") if len(state) != self.H.Ham.shape[0]: raise ValueError( f"len(state) must be {self.H.Ham.shape[0]} not {len(state)}" ) # Define the reference state ref_psi = self.H.Npsi[index].psi if not dynamics else self.H.psi_time[index].psi fidelity = np.abs(state.conj().dot(ref_psi)) ** 2 if print_value: logger.info("----------------------------------------------------") logger.info("FIDELITY: %s", fidelity) return fidelity
[docs] def compute_expH(self, beta): """Return ``exp(-beta H)`` as a sparse CSC matrix.""" return csc_matrix(expm(-beta * self.H.Ham))
[docs] def get_thermal_beta(self, state, threshold): """Estimate an effective thermal ``beta`` for a reference state.""" return self.H.get_beta(state, threshold)
[docs] def canonical_avg(self, local_obs, beta): """Compute the canonical-ensemble average of a local observable. Parameters ---------- local_obs : str Observable key in ``self.ops``. beta : float Inverse temperature. Returns ------- float Canonical average per lattice site. """ logger.info("----------------------------------------------------") logger.info("CANONICAL ENSEMBLE") op_matrix = LocalTerm( operator=self.ops[local_obs], op_name=local_obs, **self.def_params ).get_Hamiltonian(1) # Define the exponential of the Hamiltonian expm_matrix = self.compute_expH(beta) Z = np.real(expm_matrix.trace()) canonical_avg = np.real(csc_matrix(op_matrix).dot(expm_matrix).trace()) / ( Z * self.n_sites ) logger.info("Canonical avg: %s", canonical_avg) logger.info("----------------------------------------------------") return canonical_avg
[docs] def microcanonical_avg( self, local_obs_list: list[str], state: np.ndarray, special_norms: dict | None = None, staggered_avgs: dict | None = None, ): """Compute microcanonical averages for multiple local observables. The energy shell is defined using the energy density of the reference state and its uncertainty: - ``e_q = <H> / L`` - ``delta_e = sqrt(<H^2> - <H>^2) / L`` All eigenstates satisfying ``abs(e_i - e_q) < delta_e`` are included. For each observable in ``local_obs_list``, the expectation value is measured on every state in the shell and then averaged. Parameters ---------- local_obs_list : list Keys of local observables. Each key must be present in ``self.ops``. state : numpy.ndarray Reference state vector used to define the energy shell. special_norms : dict, optional Optional mapping from observable key to a custom normalization array. staggered_avgs : dict, optional Optional mapping from observable key to a staggered-averaging label or rule used by the local observable measurement. Returns ------- tuple ``(psi_thermal, ME_avg)`` where ``psi_thermal`` is the normalized coherent superposition of shell eigenstates and ``ME_avg`` is a dictionary mapping observable names to microcanonical averages. """ # Defaults for optional dicts if special_norms is None: special_norms = {obs: None for obs in local_obs_list} if staggered_avgs is None: staggered_avgs = {obs: None for obs in local_obs_list} logger.info("----------------------------------------------------") logger.info("MICRO-CANONICAL ENSEMBLE (MULTI-OBSERVABLE)") # Compute the reference energy and its variance for the energy shell. psi_ref = QMB_state(state) Eq = psi_ref.expectation_value(self.H.Ham) E2q = psi_ref.expectation_value(self.H.Ham @ self.H.Ham) DeltaE = np.sqrt(E2q - Eq**2) logger.info("Energy ref: %s", Eq) logger.info("DeltaE: %s", DeltaE) # Convert to energy densities to compare with self.H.Nenergies e_q = Eq / self.n_sites delta_e = DeltaE / self.n_sites """# Basic sanity: is the shell inside the covered spectrum? E_min = np.min(self.H.Nenergies) E_max = np.max(self.H.Nenergies) # Check that the energy shell is covered by our eigenstates. if e_q - delta_e < E_min or e_q + delta_e > E_max: shell_min = e_q - delta_e shell_max = e_q + delta_e msg = f"ME shell [{shell_min}, {shell_max}] not in [{E_min}, {E_max}]." raise ValueError(msg)""" # Pre-select eigenstate indices that are within the energy shell. shell_mask = np.abs(self.H.Nenergies - e_q) < delta_e shell_indices = np.where(shell_mask)[0] n_shell_states = len(shell_indices) if n_shell_states == 0: msg = f"ME shell empty: no eigstate |e_i-e_q|<delta_e={delta_e}." raise ValueError(msg) logger.info("Number of states in shell: %s", n_shell_states) # Initialize the thermal state (for coherent superposition) and the observable accumulators. psi_thermal = np.zeros(self.H.Ham.shape[0], dtype=np.complex128) # Pre-build LocalTerm objects local_ops = {} for obs in local_obs_list: local_ops[obs] = LocalTerm(self.ops[obs], obs, **self.def_params) # Degine a dictionary to save the ME prediction for every obs ME_avg = {f"ME_{obs}": 0.0 for obs in local_obs_list} # Loop over the shell eigenstates. for ii in shell_indices: psi_thermal += self.H.Npsi[ii].psi # For each observable, build the local operator and measure its expectation value. for obs in local_obs_list: local_ops[obs].get_expval(self.H.Npsi[ii], print_values=False) if special_norms[obs] is not None: val = np.dot(local_ops[obs].obs, special_norms[obs]) / self.n_sites elif staggered_avgs[obs] is not None: self.res[obs] = local_ops[obs].obs val = self.stag_avg(obs, staggered_avgs[obs]) else: val = np.mean(local_ops[obs].obs) ME_avg[f"ME_{obs}"] += val # Normalize the coherent superposition psi_thermal /= np.sqrt(n_shell_states) # Normalize the averages (equal weights) for obs in local_obs_list: ME_avg[f"ME_{obs}"] /= n_shell_states # Print microcanonical averages logger.info("Microcanonical averages") for obs in local_obs_list: logger.info("%s: %s", obs, ME_avg[f"ME_{obs}"]) return psi_thermal, ME_avg
[docs] def diagonal_avg( self, local_obs_list: list[str], state: np.ndarray, special_norms: dict | None = None, staggered_avgs: dict | None = None, tol_deg: float = 1e-10, ): """Compute diagonal-ensemble averages for several local observables. Parameters ---------- local_obs_list : list Observable keys in ``self.ops``. state : numpy.ndarray Reference state used to compute diagonal weights. special_norms : dict, optional Optional observable-specific normalization arrays. staggered_avgs : dict, optional Optional observable-specific staggered averaging labels. tol_deg : float, optional Tolerance used to group degenerate eigenvalues into blocks. Returns ------- dict Dictionary of diagonal-ensemble averages keyed as ``DE_<obs>``. """ logger.info("----------------------------------------------------") logger.info("DIAGONAL ENSEMBLE AVERAGE (MULTI-OBSERVABLE)") # Ensure full diagonalization is available. if self.n_eigs != self.H.Ham.shape[0]: msg = f"Need all H eigvals {self.H.Ham.shape[0]}, not only {self.n_eigs}" raise ValueError(msg) # Defaults for optional dicts if special_norms is None: special_norms = {obs: None for obs in local_obs_list} if staggered_avgs is None: staggered_avgs = {obs: None for obs in local_obs_list} # Pre-build LocalTerm objects local_ops = {} for obs in local_obs_list: local_ops[obs] = LocalTerm(self.ops[obs], obs, **self.def_params) # Initialize accumulators for the diagonal ensemble average. DE_avg = {f"DE_{obs}": 0.0 for obs in local_obs_list} # Detect degeneracy structure in the spectrum block_id, n_blocks = build_energy_block_ids(self.H.Nenergies, tol=tol_deg) # Quick check: any block size > 1? counts = np.bincount(block_id, minlength=n_blocks) has_degeneracy = np.any(counts > 1) logger.info("Degeneracy-aware DE: %s (tol=%s)", has_degeneracy, tol_deg) # Loop over every eigenstate; full diagonalization is assumed. if not has_degeneracy: for ii in range(self.n_eigs): # Fidelity of eigenstate ii. prob = self.measure_fidelity(state, ii, False) # For each observable, measure the expectation value in eigenstate ii. for obs in local_obs_list: local_ops[obs].get_expval(self.H.Npsi[ii], print_values=False) if special_norms[obs] is not None: val = np.dot(local_ops[obs].obs, special_norms[obs]) val /= self.n_sites elif staggered_avgs[obs] is not None: self.res[obs] = local_ops[obs].obs val = self.stag_avg(obs, staggered_avgs[obs]) else: # Default: use the average over all sites. val = np.mean(local_ops[obs].obs) DE_avg[f"DE_{obs}"] += prob * val else: logger.info("BLOCK DIAGONAL AVG") # block-aware path (correct with degeneracies) # Precompute overlaps coeffs_i = <E_i|psi0> coeffs = np.empty(self.n_eigs, dtype=np.complex128) for ii in range(self.n_eigs): coeffs[ii] = np.vdot(self.H.Npsi[ii].psi, state) # Loop blocks for block_idx in range(n_blocks): idxs = np.where(block_id == block_idx)[0] if idxs.size == 0: continue # Build |Psi_b> = sum_{i in block} C_i |E_i> psi_block = np.zeros_like(state) for jj in idxs: Ci = coeffs[jj] if Ci != 0.0: psi_block += Ci * self.H.Npsi[jj].psi # Skip if no weight in this block if np.real(np.vdot(psi_block, psi_block)) < 1e-14: continue psi_block_state = QMB_state(psi_block) for obs in local_obs_list: local_ops[obs].get_expval(psi_block_state, print_values=False) if special_norms[obs] is not None: val = np.dot(local_ops[obs].obs, special_norms[obs]) val /= self.n_sites elif staggered_avgs[obs] is not None: self.res[obs] = local_ops[obs].obs val = self.stag_avg(obs, staggered_avgs[obs]) else: val = np.mean(local_ops[obs].obs) # NOTE: do not normalize psi_block; this is correct: # sum_b <Psi_b|O|Psi_b> DE_avg[f"DE_{obs}"] += val # Log the individual averages. logger.info("Diagonal Ensemble Averages:") for obs in local_obs_list: logger.info("%s: %s", obs, DE_avg[f"DE_{obs}"]) logger.info("----------------------------------------------------") return DE_avg
[docs] def get_observables( self, local_obs=[], twobody_obs=[], plaquette_obs=[], nbody_obs=[], nbody_dist=[], twobody_axes=None, ): """Instantiate observable objects and cache them in ``self.obs_list``. Parameters ---------- local_obs : list, optional Names of local observables. twobody_obs : list, optional List of two-body operator-name pairs. plaquette_obs : list, optional List of plaquette operator-name lists. nbody_obs : list, optional List of n-body operator-name lists. nbody_dist : list, optional Relative distances for each n-body observable. twobody_axes : list, optional Axis labels for each two-body observable. Returns ------- None """ logger.info("----------------------------------------------------") logger.info("BUILDING OBSERVABLES") self.local_obs = local_obs self.twobody_obs = twobody_obs self.twobody_axes = twobody_axes self.plaquette_obs = plaquette_obs self.nbody_obs = nbody_obs self.obs_list = {} # --------------------------------------------------------------------------- # LIST OF LOCAL OBSERVABLES for obs in local_obs: self.obs_list[obs] = LocalTerm( operator=self.ops[obs], op_name=obs, **self.def_params ) # --------------------------------------------------------------------------- # LIST OF TWOBODY CORRELATORS for ii, op_names_list in enumerate(twobody_obs): obs = "_".join(op_names_list) op_list = [self.ops[op] for op in op_names_list] self.obs_list[obs] = TwoBodyTerm( axis=twobody_axes[ii] if twobody_axes is not None else "x", op_list=op_list, op_names_list=op_names_list, **self.def_params, ) # --------------------------------------------------------------------------- # LIST OF PLAQUETTE CORRELATORS for op_names_list in plaquette_obs: obs = "_".join(op_names_list) op_list = [self.ops[op] for op in op_names_list] last = op_names_list[-1] axes = sorted({c for c in last if c in "xyz"}) self.obs_list[obs] = PlaquetteTerm( axes=axes, op_list=op_list, op_names_list=op_names_list, **self.def_params, ) # --------------------------------------------------------------------------- # LIST OF NBODY CORRELATORS for ii, op_names_list in enumerate(nbody_obs): obs = "_".join(op_names_list) op_list = [self.ops[op] for op in op_names_list] distances = nbody_dist[ii] self.obs_list[obs] = NBodyTerm( op_list=op_list, op_names_list=op_names_list, distances=distances, **self.def_params, )
[docs] def measure_observables(self, index, dynamics=False): """Measure all observables stored in ``self.obs_list`` on one state. Parameters ---------- index : int Eigenstate or time-step index. dynamics : bool, optional If ``True``, measure on ``self.H.psi_time[index]``. Returns ------- None Results are stored in ``self.res``. """ state = self.H.Npsi[index] if not dynamics else self.H.psi_time[index] for obs in self.local_obs: self.obs_list[obs].get_expval(state) self.res[obs] = self.obs_list[obs].obs for op_names_list in self.twobody_obs: obs = "_".join(op_names_list) self.obs_list[obs].get_expval(state) self.res[obs] = self.obs_list[obs].corr if self.twobody_axes is not None: self.obs_list[obs].print_nearest_neighbors() for op_names_list in self.plaquette_obs: obs = "_".join(op_names_list) self.obs_list[obs].get_expval(state) self.res[obs] = self.obs_list[obs].obs self.res[f"{obs}_avg"] = self.obs_list[obs].avg for op_names_list in self.nbody_obs: obs = "_".join(op_names_list) self.obs_list[obs].get_expval(state) self.res[obs] = self.obs_list[obs].obs
[docs] def stag_avg(self, obs_name: str, staggered_avg=None): """Average a measured local observable on all, even, or odd sites. Parameters ---------- obs_name : str Observable key already stored in ``self.res`` as a site-resolved array. staggered_avg : str or None, optional ``None`` for the full average, or ``"even"`` / ``"odd"`` for a staggered sublattice average. Returns ------- float Requested average value. Raises ------ KeyError If ``obs_name`` is not present in ``self.res``. """ values = self.res[obs_name] if staggered_avg is None: return np.mean(values) # Build the staggered mask in lattice coordinates. mask2d = staggered_mask(self.lvals, staggered_avg) # Map the mask back to the flattened zig-zag site ordering used in self.res. mask1d = np.zeros(self.n_sites, dtype=bool) for site_idx in range(self.n_sites): coords = zig_zag(self.lvals, site_idx) mask1d[site_idx] = mask2d[coords] return np.mean(values[mask1d])
def _single_site_mask(self, site_index: int) -> np.ndarray: """Boolean mask selecting one 1D lattice site.""" site_mask = np.zeros(tuple(self.lvals), dtype=np.bool_) site_mask[(site_index,)] = True return site_mask @staticmethod def _normalize_dtype_mode(dtype_mode, allow_auto=True): """Normalize dtype-mode inputs to ``'real'``/``'complex'``/``'auto'``. Parameters ---------- dtype_mode : str or bool or None Requested dtype mode. allow_auto : bool, optional Whether ``"auto"`` and ``None`` are accepted. Returns ------- str Normalized mode label. """ if isinstance(dtype_mode, (bool, np.bool_)): return "complex" if bool(dtype_mode) else "real" if dtype_mode is None: if allow_auto: return "auto" raise ValueError("dtype_mode cannot be None in this context") if not isinstance(dtype_mode, str): msg = f"dtype_mode must be bool/str/None, got {type(dtype_mode)}" raise TypeError(msg) mode = dtype_mode.strip().lower() valid = {"real", "complex", "auto"} if allow_auto else {"real", "complex"} if mode not in valid: msg = f"dtype_mode must be one of {sorted(valid)}, got {dtype_mode!r}" raise ValueError(msg) return mode
[docs] def configure_dtype_mode(self, dtype_mode="auto", auto_mode="complex"): """Apply the global numeric dtype mode for Hamiltonian/operator assembly. Parameters ---------- dtype_mode : str or bool, optional Explicit mode (``"real"``/``"complex"`` or bool) or ``"auto"``. auto_mode : str or bool, optional Fallback mode used when ``dtype_mode == "auto"``. Returns ------- str Applied normalized mode (``"real"`` or ``"complex"``). """ mode = self._normalize_dtype_mode(dtype_mode, allow_auto=True) if mode == "auto": mode = self._normalize_dtype_mode(auto_mode, allow_auto=False) if get_default_dtype_mode() != mode: set_default_dtype_mode(mode) logger.info("NUMERIC DTYPE MODE set to %s", mode) # If a Hamiltonian container already exists, keep its value dtype aligned # with the active global mode to avoid silent casts when terms are added. ham = getattr(self, "H", None) if ham is not None and hasattr(ham, "value_list"): target_dtype = get_numeric_dtype() if ham.value_list.dtype != target_dtype: if ham.value_list.size == 0: ham.value_list = np.array([], dtype=target_dtype) else: values = coerce_numeric_array( ham.value_list, name="existing Hamiltonian values", ) ham.value_list = np.asarray(values, dtype=target_dtype) return mode
def apply_projection( projector: np.ndarray | csr_matrix | csr_matrix, operator: np.ndarray | csr_matrix | csr_matrix, ): """Project an operator onto the subspace spanned by a column projector. Parameters ---------- projector : numpy.ndarray or scipy.sparse.csr_matrix Projector-like matrix of shape ``(N, k)``. operator : numpy.ndarray or scipy.sparse.csr_matrix Operator of shape ``(N, N)``. Returns ------- numpy.ndarray or scipy.sparse.spmatrix Effective operator ``P^dagger O P`` in the same dense/sparse family as the inputs. Raises ------ ValueError If the shapes are incompatible. TypeError If dense and sparse input types are mixed. """ # Check the shape of the projector if projector.shape[0] != operator.shape[0]: msg = f"Projector and operator have incompatible shapes {projector.shape} {operator.shape}" raise ValueError(msg) # Return the projected operator if isinstance(operator, np.ndarray) and isinstance(projector, np.ndarray): return np.dot(projector.conj().T, np.dot(operator, projector)) elif isspmatrix(operator) and isspmatrix(projector): return projector.conj().transpose() @ operator @ projector else: logger.info("%s %s", type(operator), type(projector)) raise TypeError("Operator and projector must be both dense or both sparse.") def build_energy_block_ids(evals: np.ndarray, tol: float = 1e-10): """Group sorted eigenvalues into degenerate blocks. Parameters ---------- evals : numpy.ndarray Sorted eigenvalues. tol : float, optional Energies separated by less than ``tol`` are treated as degenerate. Returns ------- tuple ``(block_id, n_blocks)`` where ``block_id`` labels each eigenstate's degenerate block. """ E = np.real_if_close(np.asarray(evals)) N = E.shape[0] block_id = np.empty(N, dtype=np.int32) b = 0 block_id[0] = b for kk in range(1, N): if abs(E[kk] - E[kk - 1]) < tol: block_id[kk] = b else: b += 1 block_id[kk] = b return block_id, (b + 1) def validate_staggered_matter_lattice( lvals: list[int], model_name: str, has_obc: list[bool] | None = None ) -> None: """Validate lattice sizes for models using staggered matter fields. Parameters ---------- lvals : list Lattice extents in each spatial direction. model_name : str Human-readable model label used in the error message. has_obc : list[bool] or None, optional Boundary-condition flags per direction. Periodic directions must have even length to preserve the staggered bipartite structure. Returns ------- None Raises ------ ValueError If the lattice is incompatible with staggered matter. """ lengths = [int(length) for length in lvals] if any(length < 2 for length in lengths): msg = ( f"{model_name} with staggered matter requires at least two sites " f"and an even number of sites; " f"got lvals={list(lvals)}." ) raise ValueError(msg) total_sites = math.prod(lengths) if total_sites % 2 != 0: msg = ( f"{model_name} with staggered matter requires at least two sites " f"and an even number of sites; got lvals={list(lvals)}." ) raise ValueError(msg) if has_obc is not None: if len(has_obc) != len(lengths): raise ValueError( f"has_obc must have length {len(lengths)}, got {len(has_obc)}" ) incompatible_periodic = [ length for length, direction_has_obc in zip(lengths, has_obc) if not direction_has_obc and length % 2 != 0 ] if incompatible_periodic: msg = ( f"{model_name} with staggered matter requires at least two sites " f"and an even number of sites on periodic directions; " f"got lvals={list(lvals)} with has_obc={list(has_obc)}." ) raise ValueError(msg) def format_loc_dims(loc_dims: np.ndarray, lvals: list[int], pad: int = 3) -> str: """Log per-site values arranged on the lattice geometry. Parameters ---------- loc_dims : numpy.ndarray Per-site values to display. Most callers pass local Hilbert-space dimensions, but the helper is also used for background-charge grids. lvals : list Lattice dimensions. pad : int, optional Field width used for alignment in the log output. Returns ------- None """ def _format_entry(val, width): if isinstance(val, (int, np.integer)): return f"{int(val):>{width}d}" return f"{str(val):>{width}s}" dim = len(lvals) if dim == 1: Lx = lvals[0] arr = np.asarray(loc_dims, dtype=object).reshape(Lx) width = max(pad, max(len(str(val)) for val in arr)) row = " ".join(_format_entry(val, width) for val in arr) logger.info("[%s]", row) if dim == 2: Lx, Ly = lvals arr = np.asarray(loc_dims, dtype=object).reshape(Ly, Lx) width = max(pad, max(len(str(val)) for val in arr.flat)) for yy in range(Ly): row = " ".join(_format_entry(val, width) for val in arr[yy]) logger.info("[%s]", row) if dim == 3: Lx, Ly, Lz = lvals arr = np.asarray(loc_dims, dtype=object).reshape(Lz, Ly, Lx) # [z, y, x] width = max(pad, max(len(str(val)) for val in arr.flat)) block_sep = " " # spacing between z-layers # optional header indicating z layers header = block_sep.join( [f"z={zz}".ljust((width + 1) * Lx - 1) for zz in range(Lz)] ) logger.info("%s", header) for yy in range(Ly): row_blocks = [] for zz in range(Lz): layer = ( f"[{' '.join(_format_entry(val, width) for val in arr[zz, yy])}]" ) row_blocks.append(layer) logger.info("%s", block_sep.join(row_blocks))