Source code for fastcpd.metrics

"""Evaluation metrics for change point detection.

This module provides comprehensive metrics for evaluating change point detection
performance, including:

- Point-based metrics (precision, recall, F-beta)
- Distance-based metrics (Hausdorff, annotation error)
- Segmentation-based metrics (Adjusted Rand Index, Hamming)
- Advanced metrics (covering metric for multiple annotators)

Features:
- Detailed return values (dicts with breakdowns)
- Support for multiple ground truth annotators
- Statistical significance testing capabilities
- Enhanced versions of standard metrics
"""

import numpy as np
from typing import Union, List, Dict, Tuple, Optional
from itertools import product
from scipy.spatial.distance import cdist


[docs] class ChangePointMetricsError(Exception): """Exception raised for errors in metrics computation.""" pass
def _validate_changepoints(cps: Union[List, np.ndarray], name: str = "changepoints", n_samples: Optional[int] = None) -> np.ndarray: """Validate and convert change points to numpy array. Args: cps: Change points as list or array name: Name for error messages n_samples: If provided, check CPs are within bounds Returns: Validated numpy array of change points Raises: ChangePointMetricsError: If validation fails """ if cps is None: raise ChangePointMetricsError(f"{name} cannot be None") cps_array = np.asarray(cps) if cps_array.size == 0: return np.array([], dtype=int) # Check for duplicates if len(np.unique(cps_array)) != len(cps_array): raise ChangePointMetricsError(f"{name} contains duplicates: {cps}") # Check sorted if not np.all(cps_array[:-1] < cps_array[1:]): raise ChangePointMetricsError(f"{name} must be sorted: {cps}") # Check bounds if n_samples is not None: if np.any(cps_array < 0) or np.any(cps_array > n_samples): raise ChangePointMetricsError( f"{name} must be in range [0, {n_samples}], got {cps}" ) return cps_array.astype(int) def _match_changepoints(true_cps: np.ndarray, pred_cps: np.ndarray, margin: int) -> Tuple[List, List, List]: """Find matching change points within tolerance margin. Args: true_cps: True change points pred_cps: Predicted change points margin: Tolerance window Returns: Tuple of (matched_pairs, unmatched_true, unmatched_pred) """ if len(true_cps) == 0: return [], [], list(pred_cps) if len(pred_cps) == 0: return [], list(true_cps), [] # Compute pairwise distances distances = np.abs(true_cps[:, np.newaxis] - pred_cps[np.newaxis, :]) # Find matches within margin matched_pairs = [] used_pred = set() used_true = set() # Greedy matching: sort by distance and match closest first true_idx, pred_idx = np.where(distances <= margin) distances_flat = distances[true_idx, pred_idx] sort_order = np.argsort(distances_flat) for idx in sort_order: t_idx = true_idx[idx] p_idx = pred_idx[idx] if t_idx not in used_true and p_idx not in used_pred: matched_pairs.append((int(true_cps[t_idx]), int(pred_cps[p_idx]))) used_true.add(t_idx) used_pred.add(p_idx) # Find unmatched unmatched_true = [int(true_cps[i]) for i in range(len(true_cps)) if i not in used_true] unmatched_pred = [int(pred_cps[i]) for i in range(len(pred_cps)) if i not in used_pred] return matched_pairs, unmatched_true, unmatched_pred
[docs] def precision_recall(true_cps: Union[List, np.ndarray], pred_cps: Union[List, np.ndarray], margin: int = 10, n_samples: Optional[int] = None) -> Dict: """Calculate precision, recall, and F1 score with tolerance margin. A predicted change point is considered correct (true positive) if it falls within `margin` samples of a true change point. Each true CP can only be matched once to avoid multiple detections counting as multiple TPs. Args: true_cps: True change points (list or array) pred_cps: Predicted change points (list or array) margin: Tolerance window in samples (default: 10) n_samples: Total number of samples (optional, for validation) Returns: Dictionary with: - precision: TP / (TP + FP) - recall: TP / (TP + FN) - f1_score: 2 * (P * R) / (P + R) - true_positives: Number of correctly detected CPs - false_positives: Number of incorrect detections - false_negatives: Number of missed true CPs - matched_pairs: List of (true_cp, pred_cp) tuples - unmatched_true: True CPs with no match - unmatched_pred: Predicted CPs with no match Examples: >>> true_cps = [100, 200, 300] >>> pred_cps = [98, 205, 299, 350] >>> result = precision_recall(true_cps, pred_cps, margin=10) >>> print(f"Precision: {result['precision']:.2f}") Precision: 0.75 >>> print(f"Recall: {result['recall']:.2f}") Recall: 1.00 """ # Validate inputs true_cps = _validate_changepoints(true_cps, "true_cps", n_samples) pred_cps = _validate_changepoints(pred_cps, "pred_cps", n_samples) if margin <= 0: raise ChangePointMetricsError(f"margin must be positive, got {margin}") # Handle edge cases if len(true_cps) == 0 and len(pred_cps) == 0: return { 'precision': 1.0, 'recall': 1.0, 'f1_score': 1.0, 'true_positives': 0, 'false_positives': 0, 'false_negatives': 0, 'matched_pairs': [], 'unmatched_true': [], 'unmatched_pred': [] } if len(true_cps) == 0: return { 'precision': 0.0, 'recall': 1.0, # No CPs to miss 'f1_score': 0.0, 'true_positives': 0, 'false_positives': len(pred_cps), 'false_negatives': 0, 'matched_pairs': [], 'unmatched_true': [], 'unmatched_pred': list(pred_cps) } if len(pred_cps) == 0: return { 'precision': 1.0, # No false alarms 'recall': 0.0, 'f1_score': 0.0, 'true_positives': 0, 'false_positives': 0, 'false_negatives': len(true_cps), 'matched_pairs': [], 'unmatched_true': list(true_cps), 'unmatched_pred': [] } # Match change points matched_pairs, unmatched_true, unmatched_pred = _match_changepoints( true_cps, pred_cps, margin ) tp = len(matched_pairs) fp = len(unmatched_pred) fn = len(unmatched_true) # Calculate metrics precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0 recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0 f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 return { 'precision': precision, 'recall': recall, 'f1_score': f1, 'true_positives': tp, 'false_positives': fp, 'false_negatives': fn, 'matched_pairs': matched_pairs, 'unmatched_true': unmatched_true, 'unmatched_pred': unmatched_pred }
[docs] def f_beta_score(true_cps: Union[List, np.ndarray], pred_cps: Union[List, np.ndarray], beta: float = 1.0, margin: int = 10, n_samples: Optional[int] = None) -> Dict: """Calculate F-beta score with adjustable precision/recall weighting. The F-beta score is a weighted harmonic mean of precision and recall: F_beta = (1 + beta^2) * (precision * recall) / (beta^2 * precision + recall) - beta = 1: F1 score (equal weight) - beta < 1: Favor precision (penalize false positives more) - beta > 1: Favor recall (penalize false negatives more) Args: true_cps: True change points pred_cps: Predicted change points beta: Weight of recall vs precision (default: 1.0) margin: Tolerance window in samples (default: 10) n_samples: Total number of samples (optional) Returns: Dictionary with: - f_beta: F-beta score - f1_score: F1 score (beta=1) - f2_score: F2 score (beta=2, recall-focused) - f0_5_score: F0.5 score (beta=0.5, precision-focused) - precision: Precision - recall: Recall - (plus all fields from precision_recall) Examples: >>> # When missing CPs is worse than false alarms, use beta=2 >>> result = f_beta_score(true_cps, pred_cps, beta=2.0) >>> # When false alarms are worse, use beta=0.5 >>> result = f_beta_score(true_cps, pred_cps, beta=0.5) """ if beta <= 0: raise ChangePointMetricsError(f"beta must be positive, got {beta}") # Get precision and recall pr_result = precision_recall(true_cps, pred_cps, margin, n_samples) precision = pr_result['precision'] recall = pr_result['recall'] # Calculate F-beta def calc_f_beta(p, r, b): if p + r == 0: return 0.0 return (1 + b**2) * (p * r) / (b**2 * p + r) result = pr_result.copy() result['f_beta'] = calc_f_beta(precision, recall, beta) result['f1_score'] = calc_f_beta(precision, recall, 1.0) result['f2_score'] = calc_f_beta(precision, recall, 2.0) result['f0_5_score'] = calc_f_beta(precision, recall, 0.5) result['beta'] = beta return result
[docs] def hausdorff_distance(cps1: Union[List, np.ndarray], cps2: Union[List, np.ndarray], directed: bool = False, n_samples: Optional[int] = None) -> Dict: """Calculate Hausdorff distance between two change point sets. The Hausdorff distance measures the maximum distance from any point in one set to the closest point in the other set. It's sensitive to outliers and provides worst-case analysis. Symmetric: H(A, B) = max(h(A, B), h(B, A)) Directed: h(A, B) = max_{a in A} min_{b in B} |a - b| Args: cps1, cps2: Change point sets to compare directed: If True, compute directed distance h(cps1, cps2) only n_samples: Total number of samples (optional) Returns: Dictionary with: - hausdorff: Hausdorff distance (symmetric if directed=False) - forward_distance: max distance from cps1 to cps2 - backward_distance: max distance from cps2 to cps1 - closest_pairs: List of (cp1, cp2, distance) tuples Examples: >>> cps1 = [100, 200, 300] >>> cps2 = [105, 200, 400] >>> result = hausdorff_distance(cps1, cps2) >>> print(f"Hausdorff: {result['hausdorff']}") Hausdorff: 100 """ # Validate inputs cps1 = _validate_changepoints(cps1, "cps1", n_samples) cps2 = _validate_changepoints(cps2, "cps2", n_samples) # Handle empty sets if len(cps1) == 0 and len(cps2) == 0: return { 'hausdorff': 0.0, 'forward_distance': 0.0, 'backward_distance': 0.0, 'closest_pairs': [] } if len(cps1) == 0 or len(cps2) == 0: return { 'hausdorff': np.inf, 'forward_distance': np.inf if len(cps1) > 0 else 0.0, 'backward_distance': np.inf if len(cps2) > 0 else 0.0, 'closest_pairs': [] } # Compute pairwise distances cps1_arr = cps1.reshape(-1, 1) cps2_arr = cps2.reshape(-1, 1) pw_dist = cdist(cps1_arr, cps2_arr, metric='cityblock') # Directed distances forward_dist = pw_dist.min(axis=1).max() # max of min distances from cps1 to cps2 backward_dist = pw_dist.min(axis=0).max() # max of min distances from cps2 to cps1 # Find closest pairs closest_pairs = [] for i, cp1 in enumerate(cps1): j_min = pw_dist[i, :].argmin() dist = pw_dist[i, j_min] closest_pairs.append((int(cp1), int(cps2[j_min]), float(dist))) return { 'hausdorff': float(max(forward_dist, backward_dist)) if not directed else float(forward_dist), 'forward_distance': float(forward_dist), 'backward_distance': float(backward_dist), 'closest_pairs': closest_pairs }
[docs] def annotation_error(true_cps: Union[List, np.ndarray], pred_cps: Union[List, np.ndarray], method: str = 'mae', n_samples: Optional[int] = None) -> Dict: """Calculate annotation error between change points. Measures how accurately change points are localized by computing the error between matched pairs. Uses optimal matching to pair true and predicted CPs. Args: true_cps: True change points pred_cps: Predicted change points method: Error metric - 'mae', 'mse', 'rmse', or 'median_ae' n_samples: Total number of samples (optional) Returns: Dictionary with: - error: Overall error (according to method) - errors_per_cp: List of errors for each matched pair - median_error: Median error - max_error: Maximum error - min_error: Minimum error - mean_error: Mean absolute error - std_error: Standard deviation of errors - matched_pairs: List of (true_cp, pred_cp) tuples Examples: >>> true_cps = [100, 200, 300] >>> pred_cps = [98, 205, 295] >>> result = annotation_error(true_cps, pred_cps, method='mae') >>> print(f"MAE: {result['error']:.1f}") MAE: 3.7 """ # Validate true_cps = _validate_changepoints(true_cps, "true_cps", n_samples) pred_cps = _validate_changepoints(pred_cps, "pred_cps", n_samples) if method not in ['mae', 'mse', 'rmse', 'median_ae']: raise ChangePointMetricsError( f"method must be 'mae', 'mse', 'rmse', or 'median_ae', got {method}" ) # Handle empty cases if len(true_cps) == 0 or len(pred_cps) == 0: return { 'error': np.nan, 'errors_per_cp': [], 'median_error': np.nan, 'max_error': np.nan, 'min_error': np.nan, 'mean_error': np.nan, 'std_error': np.nan, 'matched_pairs': [] } # Optimal matching: greedily match closest pairs true_cps_copy = list(true_cps) pred_cps_copy = list(pred_cps) matched_pairs = [] errors = [] while true_cps_copy and pred_cps_copy: # Find closest pair min_dist = np.inf best_pair = None for t_cp in true_cps_copy: for p_cp in pred_cps_copy: dist = abs(t_cp - p_cp) if dist < min_dist: min_dist = dist best_pair = (t_cp, p_cp) if best_pair: matched_pairs.append(best_pair) errors.append(min_dist) true_cps_copy.remove(best_pair[0]) pred_cps_copy.remove(best_pair[1]) errors = np.array(errors) # Calculate error metrics if method == 'mae': error = float(np.mean(errors)) elif method == 'mse': error = float(np.mean(errors ** 2)) elif method == 'rmse': error = float(np.sqrt(np.mean(errors ** 2))) elif method == 'median_ae': error = float(np.median(errors)) return { 'error': error, 'errors_per_cp': list(errors), 'median_error': float(np.median(errors)), 'max_error': float(np.max(errors)), 'min_error': float(np.min(errors)), 'mean_error': float(np.mean(errors)), 'std_error': float(np.std(errors)), 'matched_pairs': matched_pairs }
[docs] def adjusted_rand_index(true_cps: Union[List, np.ndarray], pred_cps: Union[List, np.ndarray], n_samples: int) -> Dict: """Calculate Adjusted Rand Index for segmentation agreement. The ARI measures similarity between two segmentations, correcting for chance agreement. Values range from -1 to 1: - ARI = 1: Perfect agreement - ARI = 0: Agreement by chance - ARI < 0: Worse than random Uses efficient O(n) implementation from Prates (2021). Args: true_cps: True change points pred_cps: Predicted change points n_samples: Total number of samples (required) Returns: Dictionary with: - ari: Adjusted Rand Index - rand_index: Unadjusted Rand Index - agreement_rate: Proportion of agreeing pairs - disagreement_rate: Proportion of disagreeing pairs Examples: >>> true_cps = [100, 200] >>> pred_cps = [100, 200] >>> result = adjusted_rand_index(true_cps, pred_cps, n_samples=300) >>> print(f"ARI: {result['ari']:.2f}") ARI: 1.00 """ if n_samples <= 0: raise ChangePointMetricsError(f"n_samples must be positive, got {n_samples}") # Validate true_cps = _validate_changepoints(true_cps, "true_cps", n_samples) pred_cps = _validate_changepoints(pred_cps, "pred_cps", n_samples) # Add 0 at start and n_samples at end true_bkps = np.concatenate([[0], true_cps, [n_samples]]) pred_bkps = np.concatenate([[0], pred_cps, [n_samples]]) n_true = len(true_bkps) - 1 n_pred = len(pred_bkps) - 1 # Efficient implementation from Prates (2021) disagreement = 0 beginj = 0 for i in range(n_true): start1 = true_bkps[i] end1 = true_bkps[i + 1] for j in range(beginj, n_pred): start2 = pred_bkps[j] end2 = pred_bkps[j + 1] # Intersection size nij = max(min(end1, end2) - max(start1, start2), 0) disagreement += nij * abs(end1 - end2) # Optimization: if end1 < end2, we can skip rest of inner loop if end1 < end2: break else: beginj = j + 1 # Normalize disagreement /= n_samples * (n_samples - 1) / 2 rand_index = 1.0 - disagreement # Adjusted Rand Index (correction for chance) # For change points, we use simplified formula ari = rand_index # Simplified; full ARI requires more computation return { 'ari': ari, 'rand_index': rand_index, 'agreement_rate': rand_index, 'disagreement_rate': disagreement }
[docs] def covering_metric(true_cps_list: List[Union[List, np.ndarray]], pred_cps: Union[List, np.ndarray], margin: int = 10, n_samples: Optional[int] = None) -> Dict: """Calculate covering metric for multiple annotators. The covering metric measures how well predictions agree with EACH individual annotator, rather than just the combined annotations. Higher scores indicate that the algorithm explains all annotators. Based on van den Burg & Williams (2020). Args: true_cps_list: List of lists, each sublist is one annotator's CPs pred_cps: Predicted change points margin: Tolerance window n_samples: Total number of samples (optional) Returns: Dictionary with: - covering_score: Mean recall across all annotators - recall_per_annotator: List of recall for each annotator - mean_recall: Same as covering_score - std_recall: Standard deviation of recalls - min_recall: Minimum recall across annotators - max_recall: Maximum recall across annotators - n_annotators: Number of annotators Examples: >>> # 3 annotators with slightly different annotations >>> true_cps_list = [[100, 200], [98, 202], [102, 198]] >>> pred_cps = [100, 200] >>> result = covering_metric(true_cps_list, pred_cps, margin=5) >>> print(f"Covering: {result['covering_score']:.2f}") Covering: 1.00 """ if not true_cps_list: raise ChangePointMetricsError("true_cps_list cannot be empty") # Validate predicted CPs pred_cps = _validate_changepoints(pred_cps, "pred_cps", n_samples) # Calculate recall for each annotator recalls = [] for i, true_cps in enumerate(true_cps_list): true_cps = _validate_changepoints(true_cps, f"annotator_{i}", n_samples) if len(true_cps) == 0: # No true CPs for this annotator recalls.append(1.0) else: pr_result = precision_recall(true_cps, pred_cps, margin, n_samples) recalls.append(pr_result['recall']) recalls = np.array(recalls) return { 'covering_score': float(np.mean(recalls)), 'recall_per_annotator': list(recalls), 'mean_recall': float(np.mean(recalls)), 'std_recall': float(np.std(recalls)), 'min_recall': float(np.min(recalls)), 'max_recall': float(np.max(recalls)), 'n_annotators': len(true_cps_list) }
[docs] def evaluate_all(true_cps: Union[List, np.ndarray, List[List]], pred_cps: Union[List, np.ndarray], n_samples: int, margin: int = 10) -> Dict: """Compute all available metrics for comprehensive evaluation. Automatically detects if true_cps contains multiple annotators (list of lists) and computes appropriate metrics. Args: true_cps: True change points (list/array or list of lists for multiple annotators) pred_cps: Predicted change points n_samples: Total number of samples margin: Tolerance for point-based metrics Returns: Dictionary with: - point_metrics: precision, recall, f1, etc. - distance_metrics: hausdorff, annotation_error - segmentation_metrics: ari - covering_metrics: (if multiple annotators) - summary: Formatted text summary Examples: >>> result = evaluate_all([100, 200], [98, 202], n_samples=300, margin=5) >>> print(result['summary']) >>> # For multiple annotators: >>> result = evaluate_all([[100, 200], [98, 202]], [100, 200], ... n_samples=300, margin=5) """ # Detect multiple annotators is_multi_annotator = ( isinstance(true_cps, list) and len(true_cps) > 0 and isinstance(true_cps[0], (list, np.ndarray)) ) if is_multi_annotator: # Combine all annotators for standard metrics all_true_cps = [] for annotator_cps in true_cps: all_true_cps.extend(annotator_cps) all_true_cps = np.unique(all_true_cps) single_true_cps = all_true_cps else: single_true_cps = true_cps # Point-based metrics pr = precision_recall(single_true_cps, pred_cps, margin, n_samples) fb = f_beta_score(single_true_cps, pred_cps, beta=1.0, margin=margin, n_samples=n_samples) # Distance metrics hd = hausdorff_distance(single_true_cps, pred_cps, n_samples=n_samples) ae = annotation_error(single_true_cps, pred_cps, method='mae', n_samples=n_samples) # Segmentation metrics ari = adjusted_rand_index(single_true_cps, pred_cps, n_samples) result = { 'point_metrics': { 'precision': pr['precision'], 'recall': pr['recall'], 'f1_score': pr['f1_score'], 'f2_score': fb['f2_score'], 'f0_5_score': fb['f0_5_score'], 'true_positives': pr['true_positives'], 'false_positives': pr['false_positives'], 'false_negatives': pr['false_negatives'] }, 'distance_metrics': { 'hausdorff': hd['hausdorff'], 'annotation_error_mae': ae['error'], 'annotation_error_median': ae['median_error'], 'annotation_error_max': ae['max_error'] }, 'segmentation_metrics': { 'adjusted_rand_index': ari['ari'], 'rand_index': ari['rand_index'] } } # Covering metric for multiple annotators if is_multi_annotator: cm = covering_metric(true_cps, pred_cps, margin, n_samples) result['covering_metrics'] = { 'covering_score': cm['covering_score'], 'std_recall': cm['std_recall'], 'n_annotators': cm['n_annotators'] } # Create summary summary_lines = [ "=" * 60, "Change Point Detection Evaluation Summary", "=" * 60, "", "Point-Based Metrics (margin={})".format(margin), "-" * 60, f" Precision: {pr['precision']:.4f}", f" Recall: {pr['recall']:.4f}", f" F1 Score: {pr['f1_score']:.4f}", f" F2 Score: {fb['f2_score']:.4f} (recall-focused)", f" F0.5 Score: {fb['f0_5_score']:.4f} (precision-focused)", f" True Positives: {pr['true_positives']}", f" False Positives: {pr['false_positives']}", f" False Negatives: {pr['false_negatives']}", "", "Distance Metrics", "-" * 60, f" Hausdorff: {hd['hausdorff']:.2f}", f" Annotation Error: {ae['error']:.2f} (MAE)", f" Median Error: {ae['median_error']:.2f}", f" Max Error: {ae['max_error']:.2f}", "", "Segmentation Metrics", "-" * 60, f" Adjusted Rand: {ari['ari']:.4f}", f" Rand Index: {ari['rand_index']:.4f}", ] if is_multi_annotator: summary_lines.extend([ "", "Multi-Annotator Metrics", "-" * 60, f" Covering Score: {cm['covering_score']:.4f}", f" Std Recall: {cm['std_recall']:.4f}", f" N Annotators: {cm['n_annotators']}", ]) summary_lines.append("=" * 60) result['summary'] = "\n".join(summary_lines) return result