Source code for stressnet.utils.inference

"""Model inference and evaluation helpers for StressNET graph batches.

Typical use is :func:`predict` or :func:`predict_augmented` on dictionaries
produced by preprocessing / data loading, and :func:`frame_with_predicted_tensions`
to attach vector predictions to a ForSys :class:`~forsys.frames.Frame`.
"""

__all__ = ['predict', 'predict_augmented', 'frame_with_predicted_tensions', 'calculate_metrics']

from copy import deepcopy
from typing import TYPE_CHECKING, Any, Literal

import numpy as np
import tensorflow as tf
from sklearn.metrics import mean_absolute_percentage_error, r2_score
from spektral.utils.sparse import sp_matrix_to_sp_tensor

from .data_utils import randomly_rotate_edge_features

if TYPE_CHECKING:
    from forsys.frames import Frame

DEFAULT_DEVICE = '/GPU:0' if tf.config.list_physical_devices('GPU') else '/CPU:0'


[docs] def predict(model: tf.keras.Model, data: dict[str, Any], device: str | None = None ) -> np.ndarray: """Run a single forward pass and return flattened stress predictions. Parameters ---------- model Compiled StressNET Keras model. data Graph batch with keys ``'x'``, ``'a'`` (sparse adjacency), ``'e'`` (edge features). device TensorFlow device string. Defaults to GPU when available, else CPU. Returns ------- numpy.ndarray 1-D array of predictions (length equals number of graph nodes for stress mode). """ with tf.device(device or DEFAULT_DEVICE): A = sp_matrix_to_sp_tensor(data['a']) E = data['e'].reshape(data['e'].shape[0], -1) preds = model([data['x'], A, E], training=False).numpy().ravel() return preds
[docs] def predict_augmented(model: tf.keras.Model, data: dict[str, Any], n_augmentations: int = 9, seed: int | None = 1337, device: str | None = None, mode: Literal['stress', 'embedding'] = 'stress' ) -> np.ndarray: """Average predictions over random rotations of edge features (data augmentation). Parameters ---------- model Compiled StressNET Keras model. data Graph batch with keys ``'x'``, ``'a'``, ``'e'``. n_augmentations Number of *additional* random rotations (total runs = ``n_augmentations + 1``). seed RNG seed for reproducibility. device TensorFlow device string. mode ``'stress'`` rescales the mean prediction to unit mean; ``'embedding'`` returns averaged embedding without that rescaling. Returns ------- numpy.ndarray Mean prediction vector across augmentations. """ rng = np.random.default_rng(seed) pred_rows = n_augmentations + 1 if mode == 'stress': # shape = (n_nodes, 1) pred_cols = data['x'].shape[0] rescale_average_vector = True else: # shape = (1, embedding_size) pred_cols = model.output_shape[1] rescale_average_vector = False # do n_augmentations + 1 predictions and average results preds_array = np.empty((pred_rows, pred_cols), dtype=np.float32) # TODO: predict in batch for large n (requires the model to be loaded in disjoint mode) with tf.device(device or DEFAULT_DEVICE): A = sp_matrix_to_sp_tensor(data['a']) E = data['e'].reshape(data['e'].shape[0], -1) preds_array[0] = model([data['x'], A, E], training=False).numpy().ravel() for i in range(1, pred_rows): E_ = randomly_rotate_edge_features(data['e'], rng=rng).reshape(data['e'].shape[0], -1) preds_array[i] = model([data['x'], A, E_], training=False).numpy().ravel() aug_preds = preds_array.mean(axis=0) if rescale_average_vector: aug_preds /= aug_preds.mean() return aug_preds
[docs] def frame_with_predicted_tensions(frame: 'Frame', predicted_tensions: np.ndarray ) -> 'Frame': """Return a deep-copied frame with predicted tensions assigned to edges. Parameters ---------- frame Source ForSys frame (unchanged). predicted_tensions One scalar per *big edge*, in the same order as ``frame.get_big_edges()``. Returns ------- forsys.frames.Frame New frame instance; each small edge in a big edge receives the same tension. Raises ------ ValueError If the number of predictions does not match the number of big edges. """ new_frame = deepcopy(frame) tensions = np.asarray(predicted_tensions).ravel() big_edges = new_frame.get_big_edges() if tensions.size != len(big_edges): raise ValueError( f'Expected {len(big_edges)} tension values (one per big edge), got {tensions.size}.' ) for be, tension in zip(big_edges, tensions, strict=True): for e_id in be.edges: new_frame.edges[e_id].tension = float(tension) return new_frame
def _borges_score_v2(mape: float, pearsonr: float, r2: float, alpha: float = 1.0, beta: float = 1.0, gamma: float = 1.0, saturate_mape: float = 1.0, saturate_pearsonr: float = 0.99, saturate_r2: float = 0.99 ) -> float: mape = max(mape, saturate_mape) pearsonr = min(pearsonr, saturate_pearsonr) r2 = min(r2, saturate_r2) return ( 100.0 * alpha / mape + (beta / 2.0) * (1.0 + pearsonr) / (1.0 - pearsonr) + gamma / (1.0 - r2) )
[docs] def calculate_metrics(y_true: np.ndarray, y_pred: np.ndarray, ) -> tuple[float, float, float, float]: """Correlation, R², MAPE (%), and composite Borges-style score. Parameters ---------- y_true Ground-truth targets (1-D). y_pred Model predictions (same shape as ``y_true``). Returns ------- tuple[float, float, float, float] ``(pearson_r, r2, mape_percent, score)``. """ r = np.corrcoef(y_true, y_pred)[0, 1] r2 = r2_score(y_true, y_pred) mape = mean_absolute_percentage_error(y_true, y_pred) * 100 score = _borges_score_v2(mape, r, r2) return r, r2, mape, score