Source code for sgwt.util

"""General Utilities

Description: Utilities for accessing built-in data, VFKern, and impulse helper function.

Author: Luke Lowery (lukel@tamu.edu)
"""

import sys
import os

if sys.version_info >= (3, 9):
    from importlib.resources import as_file, files
else:  # pragma: no cover
    from importlib_resources import as_file, files

from ctypes import CDLL
from dataclasses import dataclass

import numpy as np
from scipy.io import loadmat
from scipy.sparse import csc_matrix, linalg

from json import load as jsonload
from typing import Any, Callable, Dict, List, Union, Optional, Tuple


@dataclass
class _FolderConfig:
    """Configuration for auto-discovering resources in a library folder."""
    folder: str
    extension: str
    key_fn: Callable[[str], str]
    loader_fn: Callable[[str], Callable[[], Any]]
    secondary_fn: Optional[Callable[[str], Tuple[str, Callable[[], Any]]]] = None


[docs] @dataclass class ChebyKernel: """Stores Chebyshev polynomial approximations for one or more kernels.""" C: np.ndarray """Coefficient matrix of shape (order + 1, n_dims).""" spectrum_bound: float """Shared upper spectrum bound for all kernels."""
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'ChebyKernel': """Loads kernel data from a dictionary.""" approxs = data.get('approximations', []) bound = data.get('spectrum_bound', 0.0) if not approxs: return cls(C=np.empty((0, 0)), spectrum_bound=bound) coeffs = [np.asarray(a.get('coeffs', [])) for a in approxs] if any(len(c) != len(coeffs[0]) for c in coeffs): raise ValueError("All 'coeffs' arrays must have the same length.") return cls(C=np.stack(coeffs, axis=1), spectrum_bound=bound)
[docs] @classmethod def from_function( cls, f: Callable[[np.ndarray], np.ndarray], order: int, spectrum_bound: float, n_samples: int = None, sampling: str = 'chebyshev', min_lambda: float = 0.0, rtol: float = 1e-12, adaptive: bool = False, max_order: int = 500, target_error: float = 1e-10 ) -> 'ChebyKernel': """Creates a ChebyKernel by fitting a vectorized function. Parameters ---------- f : Callable[[np.ndarray], np.ndarray] The vectorized function to approximate. order : int Order of the Chebyshev polynomial to fit. spectrum_bound : float Upper bound of the function's domain. n_samples : int, optional Number of sample points (only used for non-Chebyshev sampling). sampling : str, default 'chebyshev' Sampling strategy: 'chebyshev' (optimal), 'linear', 'quadratic', 'logarithmic'. min_lambda : float, default 0.0 Lower bound of the sampling range. rtol : float, default 1e-12 Relative tolerance for truncating negligible coefficients. adaptive : bool, default False If True, automatically determines optimal order to achieve target_error. max_order : int, default 500 Maximum order for adaptive mode. target_error : float, default 1e-10 Target approximation error for adaptive mode. """ if order < 1: raise ValueError("Order must be >= 1") if adaptive: return cls._adaptive_fit(f, order, spectrum_bound, min_lambda, sampling, rtol, max_order, target_error) coeffs = cls._compute_coefficients(f, order, spectrum_bound, min_lambda, n_samples, sampling) coeffs = cls._ensure_2d(coeffs) coeffs = cls._truncate(coeffs, rtol) return cls(C=coeffs, spectrum_bound=spectrum_bound)
@classmethod def _compute_coefficients(cls, f, order, spectrum_bound, min_lambda, n_samples, sampling): """Compute Chebyshev coefficients using optimal or fallback method.""" lambda_range = spectrum_bound - min_lambda lambda_mid = (spectrum_bound + min_lambda) / 2.0 if sampling == 'chebyshev': # Chebyshev-Gauss-Lobatto nodes: optimal for polynomial interpolation n = order + 1 k = np.arange(n) x_cheb = np.cos(np.pi * k / order) # Nodes in [-1, 1] sample_x = lambda_mid + (lambda_range / 2.0) * x_cheb f_values = cls._ensure_2d(f(sample_x)) # Compute coefficients via discrete orthogonality (DCT-like) coeffs = np.zeros((n, f_values.shape[1])) w = np.ones(n); w[0] = w[-1] = 0.5 # Endpoint weights for j in range(n): T_j = np.cos(j * np.pi * k / order) scale = 2.0 / order if 0 < j < order else 1.0 / order coeffs[j] = scale * np.sum(w[:, None] * f_values * T_j[:, None], axis=0) return coeffs # Fallback: least-squares fitting for other sampling strategies n_samples = n_samples or max(4 * (order + 1), 1000) t = np.linspace(0, 1, n_samples) if sampling == 'quadratic': sample_x = min_lambda + lambda_range * (t ** 2) elif sampling == 'logarithmic': eps = max(min_lambda * 0.001, 1e-10) sample_x = np.exp(np.log(min_lambda + eps) + t * np.log(spectrum_bound / (min_lambda + eps))) else: # linear sample_x = min_lambda + lambda_range * t x_scaled = 2.0 * (sample_x - min_lambda) / lambda_range - 1.0 # Chebyshev-weighted least squares weights = 1.0 / np.sqrt(1.0 - np.clip(x_scaled ** 2, 0, 0.9999)) return np.polynomial.chebyshev.chebfit(x_scaled, f(sample_x), order, w=weights) @classmethod def _adaptive_fit(cls, f, start_order, spectrum_bound, min_lambda, sampling, rtol, max_order, target_error): """Adaptively determine optimal polynomial order.""" test_x = np.linspace(min_lambda, spectrum_bound, 1000) f_exact = cls._ensure_2d(f(test_x)) order = max(start_order, 8) while True: coeffs = cls._ensure_2d( cls._compute_coefficients(f, order, spectrum_bound, min_lambda, None, sampling) ) # Evaluate and compute relative error x_scaled = 2.0 * test_x / spectrum_bound - 1.0 f_approx = np.polynomial.chebyshev.chebval(x_scaled, coeffs) f_approx = f_approx.T if f_approx.ndim > 1 else f_approx[:, None] rel_error = np.max(np.abs(f_exact - f_approx) / np.maximum(np.abs(f_exact), 1e-15)) if rel_error <= target_error or order >= max_order: break order = min(int(order * 1.5) + 1, max_order) return cls(C=cls._truncate(coeffs, rtol), spectrum_bound=spectrum_bound) @classmethod def _ensure_2d(cls, arr): """Ensure array is 2D with shape (n, dims).""" arr = np.atleast_1d(arr) return arr[:, None] if arr.ndim == 1 else arr @classmethod def _truncate(cls, coeffs, rtol): """Truncate negligible higher-order coefficients.""" threshold = rtol * np.max(np.abs(coeffs)) row_max = np.max(np.abs(coeffs), axis=1) significant = np.where(row_max > threshold)[0] last_idx = significant[-1] + 1 if significant.size > 0 else 1 return coeffs[:last_idx]
[docs] @classmethod def from_function_on_graph(cls, L: csc_matrix, f: Callable[[np.ndarray], np.ndarray], order: int, **kwargs) -> 'ChebyKernel': """Creates a ChebyKernel fitted to a graph's spectrum.""" return cls.from_function(f, order, estimate_spectral_bound(L), **kwargs)
def _scale_x(self, x: np.ndarray) -> np.ndarray: """Maps points from [0, spectrum_bound] to Chebyshev domain [-1, 1].""" return (2.0 / self.spectrum_bound) * x - 1.0
[docs] def evaluate(self, x: np.ndarray) -> np.ndarray: """Evaluates the Chebyshev approximation at given points.""" if self.C.size == 0: return np.empty((len(x), 0)) y = np.polynomial.chebyshev.chebval(self._scale_x(x), self.C) return y.T if y.ndim > 1 else y
[docs] def estimate_spectral_bound(L: csc_matrix) -> float: """ Estimates the largest eigenvalue (spectral bound) of a matrix. This is typically used to find the domain [0, lambda_max] for Chebyshev polynomial approximations. Parameters ---------- L : csc_matrix The matrix (e.g., Graph Laplacian) for which to estimate the bound. Returns ------- float An estimate of the largest eigenvalue, scaled by 1.01 for safety. """ # Note: Using eigs from scipy.sparse.linalg e_max = linalg.eigs(L, k=1, which='LM', return_eigenvectors=False) return float(e_max[0].real) * 1.01
[docs] @dataclass class VFKernel: """Vector Fitting Kernel representation. A dataclass to store the components of a rational kernel approximation obtained from Vector Fitting. """ R: np.ndarray """Residue matrix of shape (n_poles, n_dims).""" Q: np.ndarray """Poles vector of shape (n_poles,).""" D: np.ndarray """Direct term (offset) of shape (n_dims,)."""
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'VFKernel': """Loads kernel data from a dictionary. Parameters ---------- data : dict A dictionary containing the kernel parameters, typically loaded from a JSON file. It should have 'poles' and 'd' keys. Returns ------- VFKernel A new instance of the VFKernel class. """ poles = data.get('poles', []) return cls( R=np.array([p.get('r', []) for p in poles]), Q=np.array([p.get('q', 0) for p in poles]), D=np.array(data.get('d', [])) )
[docs] def impulse(lap: csc_matrix, n: int = 0, n_timesteps: int = 1) -> np.ndarray: """ Generates a Dirac impulse signal at a specified vertex. Parameters ---------- lap : csc_matrix Graph Laplacian defining the number of vertices. n : int Index of the vertex where the impulse is applied. n_timesteps : int Number of time steps (columns) in the resulting signal. Returns ------- np.ndarray 1D array (n_vertices,) if n_timesteps=1, otherwise 2D array (n_vertices, n_timesteps) with 1.0 at index n and 0.0 elsewhere. """ if n_timesteps == 1: b: np.ndarray = np.zeros(lap.shape[0]) b[n] = 1.0 else: b = np.zeros((lap.shape[0], n_timesteps), order='F') b[n] = 1.0 return b
def _load_dll(dll_name: str) -> CDLL: """Locates and loads a shared library from the library/dll directory. Handles platform-specific path adjustments to ensure the DLL can be found and loaded by ctypes. Raises ------ OSError If the DLL file cannot be loaded. Exception For other unexpected errors during loading. Returns ------- ctypes.CDLL The loaded DLL object. """ resource = files("sgwt") / "library" / "dll" / dll_name with as_file(resource) as dll_path: dll_dir = os.path.dirname(dll_path) # On Windows, add the DLL's directory to the search path for dependencies if hasattr(os, 'add_dll_directory'): os.add_dll_directory(dll_dir) else: # pragma: no cover os.environ['PATH'] = str(dll_dir) + os.pathsep + os.environ['PATH'] try: return CDLL(str(dll_path)) except OSError as e: raise OSError(f"Failed to load DLL at {dll_path}. Error: {e}") except Exception as e: # pragma: no cover raise Exception(f"Unexpected error loading DLL: {e}") def get_cholmod_dll() -> CDLL: """Locates and loads the CHOLMOD shared library.""" return _load_dll("cholmod.dll") def get_klu_dll() -> CDLL: """Locates and loads the KLU shared library.""" return _load_dll("klu.dll") def _load_resource(path: str, loader: Callable[[str], Any]) -> Any: """Centralized resource loader using importlib.resources.""" with as_file(files("sgwt").joinpath(path)) as file_path: if not os.path.exists(file_path): raise FileNotFoundError(f"Resource not found: {file_path}") return loader(str(file_path)) def _mat_loader(path: str, to_csc: bool = False) -> Union[np.ndarray, csc_matrix]: """ Loads data from a .mat file. If a single variable is present, it is returned. If multiple variables are found, they are flattened and stacked into columns of a single array. """ data = loadmat(path, squeeze_me=False) keys = [k for k in data if not k.startswith("__")] if not keys: raise ValueError(f"No data variables found in MAT file: {path}") res = data[keys[0]] if to_csc: # Data may already be sparse from loadmat; use hasattr to avoid # pytest-cov instrumentation issues with scipy.sparse.issparse() if hasattr(res, "tocsc"): return res.tocsc() return csc_matrix(res) if len(keys) > 1: return np.stack([data[k].flatten() for k in keys], axis=1) return res.T if (res.ndim == 2 and res.shape[0] == 1) else res def _json_kern_loader(path: str) -> Dict[str, Any]: """Loads a VFKern from a JSON file.""" with open(path, "r") as f: return jsonload(f) def _parse_ply(filepath: str) -> tuple: """ Parses a .ply mesh file and returns vertices and faces. Parameters ---------- filepath : str Path to the .ply file. Returns ------- tuple (vertices, faces, vertex_count) where vertices is a list of (x,y,z) tuples, faces is a list of vertex index lists, and vertex_count is the number of vertices. """ import struct with open(filepath, 'rb') as f: # Parse header fmt = "ascii" vertex_count = 0 face_count = 0 vertex_props = [] current_element = None while True: line = f.readline().strip().decode('ascii', errors='ignore') if line == "end_header": break parts = line.split() if not parts: continue if parts[0] == "format": fmt = parts[1] elif parts[0] == "element": current_element = parts[1] if current_element == "vertex": vertex_count = int(parts[2]) elif current_element == "face": face_count = int(parts[2]) elif parts[0] == "property" and current_element == "vertex": vertex_props.append((parts[2], parts[1])) vertices = [] faces = [] if fmt == "ascii": lines = f.readlines() for i in range(vertex_count): parts = lines[i].strip().split() vertices.append((float(parts[0]), float(parts[1]), float(parts[2]))) for i in range(face_count): parts = lines[vertex_count + i].strip().split() faces.append([int(x) for x in parts[1:]]) elif fmt == "binary_little_endian": np_type_map = { 'char': 'i1', 'uchar': 'u1', 'short': 'i2', 'ushort': 'u2', 'int': 'i4', 'uint': 'u4', 'float': 'f4', 'double': 'f8' } dtype_fields = [(name, np_type_map.get(t, 'f4')) for name, t in vertex_props] vertex_dtype = np.dtype(dtype_fields) vertex_data = f.read(vertex_count * vertex_dtype.itemsize) v_arr = np.frombuffer(vertex_data, dtype=vertex_dtype) if 'x' in v_arr.dtype.names and 'y' in v_arr.dtype.names and 'z' in v_arr.dtype.names: vertices = list(zip(v_arr['x'], v_arr['y'], v_arr['z'])) else: names = v_arr.dtype.names vertices = list(zip(v_arr[names[0]], v_arr[names[1]], v_arr[names[2]])) for _ in range(face_count): n = struct.unpack('<B', f.read(1))[0] faces.append(list(struct.unpack(f'<{n}i', f.read(n * 4)))) else: raise ValueError(f"Unsupported PLY format: {fmt}") return vertices, faces, vertex_count def load_ply_laplacian(filepath: str) -> csc_matrix: """ Loads a .ply mesh file and returns its graph Laplacian. This is a convenience function for loading mesh data directly into the sparse format required by the convolution solvers. Parameters ---------- filepath : str Path to the .ply file. Returns ------- csc_matrix The graph Laplacian matrix L = B @ B.T where B is the incidence matrix. Examples -------- >>> from sgwt import load_ply_laplacian, Convolve >>> L = load_ply_laplacian("my_mesh.ply") >>> with Convolve(L) as conv: ... coeffs = conv.lowpass(signal, scales=[1.0]) """ vertices, faces, vertex_count = _parse_ply(filepath) # Extract unique edges from faces unique_edges = set() for face in faces: n = len(face) for i in range(n): u, v = face[i], face[(i + 1) % n] unique_edges.add((u, v) if u < v else (v, u)) edges = np.array(sorted(unique_edges), dtype=int) num_edges = len(edges) # Build incidence matrix B and compute Laplacian L = B @ B.T rows = edges.ravel() cols = np.repeat(np.arange(num_edges), 2) data = np.tile([1.0, -1.0], num_edges) B = csc_matrix((data, (rows, cols)), shape=(vertex_count, num_edges)) return B @ B.T def load_ply_xyz(filepath: str) -> np.ndarray: """ Loads a .ply mesh file and returns the vertex coordinates. Parameters ---------- filepath : str Path to the .ply file. Returns ------- np.ndarray An (N, 3) array of vertex coordinates (x, y, z). Examples -------- >>> from sgwt import load_ply_xyz >>> xyz = load_ply_xyz("my_mesh.ply") >>> print(xyz.shape) # (num_vertices, 3) """ vertices, _, _ = _parse_ply(filepath) return np.array(vertices) # Factory helpers def _lap(k: str, r: str) -> csc_matrix: """Loads a Laplacian from library/{TYPE}/{REGION}.mat.""" return _load_resource(f"library/{k}/{r}.mat", lambda p: _mat_loader(p, to_csc=True)) # type: ignore def _sig(r: str) -> np.ndarray: """Loads a signal from library/SIGNALS/{REGION}.mat.""" return _load_resource(f"library/SIGNALS/{r}.mat", _mat_loader) # type: ignore def _kern(n: str) -> Dict[str, Any]: """Loads a kernel from library/KERNELS/{NAME}.json.""" return _load_resource(f"library/KERNELS/{n}.json", _json_kern_loader) def _ply_lap(n: str) -> csc_matrix: """Loads a mesh Laplacian from library/MESH/{NAME}.ply.""" return _load_resource(f"library/MESH/{n}.ply", load_ply_laplacian) # type: ignore def _ply_xyz(n: str) -> np.ndarray: """Loads mesh coordinates from library/MESH/{NAME}.ply.""" return _load_resource(f"library/MESH/{n}.ply", load_ply_xyz) # type: ignore # Auto-discovery configuration for library folders _FOLDER_CONFIGS: List[_FolderConfig] = [ _FolderConfig("KERNELS", ".json", lambda s: s, lambda s: lambda: _kern(s)), _FolderConfig("DELAY", ".mat", lambda s: f"DELAY_{s}", lambda s: lambda: _lap("DELAY", s)), _FolderConfig("IMPEDANCE", ".mat", lambda s: f"IMPEDANCE_{s}", lambda s: lambda: _lap("IMPEDANCE", s)), _FolderConfig("LENGTH", ".mat", lambda s: f"LENGTH_{s}", lambda s: lambda: _lap("LENGTH", s)), _FolderConfig("SIGNALS", ".mat", lambda s: f"COORD_{s}", lambda s: lambda: _sig(s)), _FolderConfig("MESH", ".ply", lambda s: f"MESH_{s}", lambda s: lambda: _ply_lap(s), lambda s: (f"{s}_XYZ", lambda: _ply_xyz(s))), ] _LAZY_REGISTRY: Optional[Dict[str, Callable[[], Any]]] = None def _discover_resources() -> Dict[str, Callable[[], Any]]: """Scans library folders and builds the lazy-loading registry.""" registry: Dict[str, Callable[[], Any]] = {} library = files("sgwt") / "library" for cfg in _FOLDER_CONFIGS: folder = library / cfg.folder try: items = list(folder.iterdir()) except (TypeError, FileNotFoundError): continue for item in items: if not item.name.endswith(cfg.extension): continue stem = item.name[:-len(cfg.extension)] # Add primary entry registry[cfg.key_fn(stem)] = cfg.loader_fn(stem) # Add secondary entry if configured (e.g., MESH -> XYZ) if cfg.secondary_fn: key, loader = cfg.secondary_fn(stem) registry[key] = loader return registry def _ensure_registry() -> Dict[str, Callable[[], Any]]: """Returns the registry, building it on first access.""" global _LAZY_REGISTRY if _LAZY_REGISTRY is None: _LAZY_REGISTRY = _discover_resources() return _LAZY_REGISTRY def list_graphs() -> None: """ Prints a table of all available graph Laplacians in the library. Displays the graph name, number of vertices, and number of edges. """ registry = _ensure_registry() # Filter for graph keys based on known prefixes graph_prefixes = ("DELAY_", "IMPEDANCE_", "LENGTH_", "MESH_") graph_keys = [k for k in registry.keys() if k.startswith(graph_prefixes)] if not graph_keys: print("No graphs found in the library.") return # Header print(f"{'Graph Name':<30} {'Vertices':<10} {'Edges':<10}") print("-" * 52) for key in sorted(graph_keys): try: # Load the graph L = registry[key]() # Check if it looks like a sparse matrix if hasattr(L, 'shape') and hasattr(L, 'nnz'): n_vertices = L.shape[0] # Calculate edges: (nnz - non_zero_diagonal_elements) / 2 # This handles cases with isolated nodes (0 on diagonal) if hasattr(L, 'diagonal'): diag_nnz = np.count_nonzero(L.diagonal()) n_edges = (L.nnz - diag_nnz) // 2 else: # Fallback n_edges = (L.nnz - n_vertices) // 2 print(f"{key:<30} {n_vertices:<10} {n_edges:<10}") except Exception: continue def __getattr__(name: str) -> Any: registry = _ensure_registry() if name in registry: return registry[name]() raise AttributeError(f"module {__name__!r} has no attribute {name!r}") def __dir__() -> List[str]: return list(globals().keys()) + list(_ensure_registry().keys()) __all__ = ["ChebyKernel", "VFKernel", "impulse", "get_cholmod_dll", "get_klu_dll", "estimate_spectral_bound", "load_ply_laplacian", "load_ply_xyz", "list_graphs"]