Source code for edlgt.tools.manage_data

"""Utilities for saving and loading dictionaries, tabular text data, and sparse matrices.

This module provides lightweight I/O helpers used by examples and scripts:

- store Python dictionaries with ``pickle`` or NumPy ``.npz`` archives,
- append data series to a simple comma-separated text file format,
- read tabular text files back into NumPy arrays,
- export sparse matrices to a human-readable ``.dat`` file.
"""

import os
import pickle

import numpy as np

from .checks import validate_parameters

__all__ = [
    "save_dictionary",
    "load_dictionary",
    "save_data_in_textfile",
    "load_data_from_textfile",
    "save_sparse_matrix_to_dat",
]


def _normalize_dictionary_format(filename, file_format):
    """Normalize the dictionary archive format string.

    Parameters
    ----------
    filename : str
        Target file name used to infer the format when ``file_format`` is not
        provided explicitly.
    file_format : str or None
        Requested archive format. Supported values are ``"pickle"``/``"pkl"``
        and ``"npz"``.

    Returns
    -------
    str
        Normalized format label.

    Raises
    ------
    TypeError
        If ``file_format`` is neither ``None`` nor a string.
    ValueError
        If the requested format is not supported.
    """
    if file_format is None:
        extension = os.path.splitext(filename)[1].lower()
        if extension == ".npz":
            return "npz"
        return "pickle"
    if not isinstance(file_format, str):
        raise TypeError(f"file_format should be STRING or None, not {type(file_format)}")
    normalized = file_format.lower()
    aliases = {"pickle": "pickle", "pkl": "pickle", "npz": "npz"}
    if normalized not in aliases:
        raise ValueError(
            f"Unsupported file_format '{file_format}'. Supported formats are 'pickle' and 'npz'."
        )
    return aliases[normalized]


def _resolve_dictionary_filename(filename, file_format):
    """Return the on-disk filename associated with the chosen archive format."""
    if file_format == "npz" and not filename.endswith(".npz"):
        return f"{filename}.npz"
    return filename


[docs] def save_dictionary(dictionary, filename, file_format=None, compressed=True): """Serialize a dictionary to a pickle file or NumPy ``.npz`` archive. Parameters ---------- dictionary : dict Dictionary to save. filename : str Output file path. If ``file_format`` is omitted, the archive format is inferred from the extension: ``.npz`` selects the NumPy archive path, while every other extension defaults to pickle. file_format : str or None, optional Explicit archive format. Supported values are ``"pickle"``/``"pkl"`` and ``"npz"``. When omitted, the format is inferred from ``filename``. compressed : bool, optional Compression flag used only for ``.npz`` archives. It is ignored for pickle output. Returns ------- str Final archive filename written to disk. For ``.npz`` archives the suffix is appended automatically when missing. Raises ------ TypeError If ``dictionary``, ``filename``, ``file_format``, or ``compressed`` has an invalid type. ValueError If ``file_format`` is unsupported or if a value cannot be represented safely in ``.npz`` format. Notes ----- The ``.npz`` path is intended for flat dictionaries of array-like values. Nested Python objects should be stored with pickle instead. """ validate_parameters(dictionary=dictionary, filename=filename) if not isinstance(compressed, bool): raise TypeError(f"compressed should be BOOL, not a {type(compressed)}") normalized_format = _normalize_dictionary_format(filename, file_format) output_file = _resolve_dictionary_filename(filename, normalized_format) if normalized_format == "pickle": with open(output_file, "wb") as outp: # Overwrites any existing file. pickle.dump(dictionary, outp, pickle.HIGHEST_PROTOCOL) return output_file payload = {} for key, value in dictionary.items(): if not isinstance(key, str): raise TypeError( f"Dictionary keys must be STRINGs for npz output, not {type(key)}" ) array_value = np.asarray(value) if array_value.dtype == object: raise ValueError( "npz output supports only values convertible to non-object NumPy arrays; " "use pickle for nested Python objects." ) payload[key] = array_value save_fn = np.savez_compressed if compressed else np.savez save_fn(output_file, **payload) return output_file
[docs] def load_dictionary(filename, file_format=None): """Load a dictionary stored with pickle or as a NumPy ``.npz`` archive. Parameters ---------- filename : str Path to the stored dictionary. If ``file_format`` is omitted, ``.npz`` files are detected from the extension. When ``file_format="npz"``, the suffix is added automatically if missing. For convenience, a bare file name also resolves to ``filename + ".npz"`` when that archive exists. file_format : str or None, optional Explicit archive format. Supported values are ``"pickle"``/``"pkl"`` and ``"npz"``. When omitted, the format is inferred from ``filename``. Returns ------- dict Deserialized dictionary. For ``.npz`` archives, 0-dimensional arrays are converted back to Python scalars while higher-dimensional entries are returned as NumPy arrays. Raises ------ TypeError If ``filename`` or ``file_format`` has an invalid type. ValueError If ``file_format`` is unsupported. """ validate_parameters(filename=filename) if ( file_format is None and os.path.splitext(filename)[1] == "" and not os.path.exists(filename) and os.path.exists(f"{filename}.npz") ): normalized_format = "npz" input_file = f"{filename}.npz" else: normalized_format = _normalize_dictionary_format(filename, file_format) input_file = _resolve_dictionary_filename(filename, normalized_format) if normalized_format == "pickle": with open(input_file, "rb") as outp: return pickle.load(outp) with np.load(input_file, allow_pickle=False) as archive: data = {} for key in archive.files: value = archive[key] data[key] = value.item() if value.shape == () else value.copy() return data
[docs] def save_data_in_textfile(data_file, x_data, new_data): """Append a data series as a new column in a simple text table. The file format is line-based and comma-separated. On first write, the file is created and initialized with the values from ``x_data`` (usually an x-axis label followed by x values). On subsequent writes, each line gets one extra comma-separated entry from ``new_data``. Parameters ---------- data_file : str Path to the text file to create/update. x_data : list Reference column written when the file does not yet exist. Conventionally the first item is a label and the remaining items are x values. new_data : list Column to append to the file. It must contain the same number of entries as the current file line count. Conventionally the first item is a label. Returns ------- None Raises ------ TypeError If the input arguments have invalid types. Notes ----- This helper assumes ``x_data`` and ``new_data`` are already aligned line by line. It does not validate lengths or parse values. """ if not isinstance(data_file, str): raise TypeError(f"data_file should be a STRING, not a {type(data_file)}") if not isinstance(x_data, list): raise TypeError(f"x_data must be a LIST, not a {type(x_data)}") if not isinstance(new_data, list): raise TypeError(f"new_data must be a LIST, not a {type(new_data)}") # STORE X VALUES if not os.path.exists(data_file): with open(data_file, "w+", encoding="utf-8") as data_handle: for entry in x_data: data_handle.write(f"{entry}\n") # STORE NEW Y VALUES with open(data_file, "r", encoding="utf-8") as data_handle: lines = data_handle.readlines() with open(data_file, "w+", encoding="utf-8") as data_handle: for index, line_entry in enumerate(lines): stripped_line = line_entry.rstrip() data_handle.write(f"{stripped_line},{new_data[index]}\n")
[docs] def load_data_from_textfile(data_file_name, row_for_labels=False): """Load columnar numeric data from a comma-separated text file. Parameters ---------- data_file_name : str Path to the input file. row_for_labels : bool, optional If ``True``, interpret the first line as column labels and store them as ``"label_0"``, ``"label_1"``, ... entries in the returned dictionary. Default is ``False``. Returns ------- dict Dictionary containing one NumPy array per column under string keys ``"0"``, ``"1"``, ... . If ``row_for_labels`` is ``True``, label entries are also included. Raises ------ TypeError If ``data_file_name`` or ``row_for_labels`` has an invalid type. Notes ----- Data values are converted to ``float``. This helper expects a regular comma-separated table with the same number of columns on each line. """ if not isinstance(data_file_name, str): raise TypeError( f"data_file_name should be a STRING, not a {type(data_file_name)}" ) if not isinstance(row_for_labels, bool): raise TypeError(f"row_for_labels must be a BOOL, not a {type(row_for_labels)}") # Open the file and acquire all the lines with open(data_file_name, "r", encoding="utf-8") as data_handle: lines = data_handle.readlines() # CREATE A DICTIONARY TO HOST THE LISTS OBTAINED FROM EACH COLUMN OF data_file data = {} # Get the first line of the File as a list of entries. first_row_entries = lines[0].strip().split(",") for column_index, entry in enumerate(first_row_entries): # Generate a list for each column of data_file data[str(column_index)] = [] if row_for_labels: # Generate a label for each list using the corresponding first-row entry. data[f"label_{column_index}"] = str(entry) if row_for_labels: # IGNORE THE FIRST LINE OF lines (ALREADY USED FOR THE LABELS) del lines[0] # Fill the lists with the entries of Columns for line_entry in lines: row_entries = line_entry.strip().split(",") for column_index, _ in enumerate(first_row_entries): data[str(column_index)].append(float(row_entries[column_index])) for column_index, _ in enumerate(first_row_entries): data[str(column_index)] = np.asarray(data[str(column_index)]) return data
[docs] def save_sparse_matrix_to_dat(sparse_matrix, filename): """Export a sparse matrix to a human-readable ``.dat`` text file. The output contains: - a header line with the matrix dimension, - one line per non-zero entry with row/column indices and complex value. Parameters ---------- sparse_matrix : scipy.sparse.spmatrix Sparse matrix to export. filename : str Output file path. Returns ------- None Raises ------ TypeError If ``sparse_matrix`` or ``filename`` has an invalid type. """ validate_parameters(op_list=[sparse_matrix], filename=filename) with open(filename, "w", encoding="utf-8") as data_handle: # Write the dimension of the matrix data_handle.write("# dimension\n") data_handle.write(f"{sparse_matrix.shape[0]}\n") # Write the non-zero elements data_handle.write("# Non-zero elements: coordinates and coefficients\n") coo = sparse_matrix.tocoo() for row_index, col_index, value in zip(coo.row, coo.col, coo.data): data_handle.write( f"{row_index}, {col_index}; ({value.real}, {value.imag})\n" )