"""Evaluation metrics for change point detection.
This module provides comprehensive metrics for evaluating change point detection
performance, including:
- Point-based metrics (precision, recall, F-beta)
- Distance-based metrics (Hausdorff, annotation error)
- Segmentation-based metrics (Adjusted Rand Index, Hamming)
- Advanced metrics (covering metric for multiple annotators)
Features:
- Detailed return values (dicts with breakdowns)
- Support for multiple ground truth annotators
- Statistical significance testing capabilities
- Enhanced versions of standard metrics
"""
import numpy as np
from typing import Union, List, Dict, Tuple, Optional
from itertools import product
from scipy.spatial.distance import cdist
[docs]
class ChangePointMetricsError(Exception):
"""Exception raised for errors in metrics computation."""
pass
def _validate_changepoints(cps: Union[List, np.ndarray], name: str = "changepoints",
n_samples: Optional[int] = None) -> np.ndarray:
"""Validate and convert change points to numpy array.
Args:
cps: Change points as list or array
name: Name for error messages
n_samples: If provided, check CPs are within bounds
Returns:
Validated numpy array of change points
Raises:
ChangePointMetricsError: If validation fails
"""
if cps is None:
raise ChangePointMetricsError(f"{name} cannot be None")
cps_array = np.asarray(cps)
if cps_array.size == 0:
return np.array([], dtype=int)
# Check for duplicates
if len(np.unique(cps_array)) != len(cps_array):
raise ChangePointMetricsError(f"{name} contains duplicates: {cps}")
# Check sorted
if not np.all(cps_array[:-1] < cps_array[1:]):
raise ChangePointMetricsError(f"{name} must be sorted: {cps}")
# Check bounds
if n_samples is not None:
if np.any(cps_array < 0) or np.any(cps_array > n_samples):
raise ChangePointMetricsError(
f"{name} must be in range [0, {n_samples}], got {cps}"
)
return cps_array.astype(int)
def _match_changepoints(true_cps: np.ndarray, pred_cps: np.ndarray,
margin: int) -> Tuple[List, List, List]:
"""Find matching change points within tolerance margin.
Args:
true_cps: True change points
pred_cps: Predicted change points
margin: Tolerance window
Returns:
Tuple of (matched_pairs, unmatched_true, unmatched_pred)
"""
if len(true_cps) == 0:
return [], [], list(pred_cps)
if len(pred_cps) == 0:
return [], list(true_cps), []
# Compute pairwise distances
distances = np.abs(true_cps[:, np.newaxis] - pred_cps[np.newaxis, :])
# Find matches within margin
matched_pairs = []
used_pred = set()
used_true = set()
# Greedy matching: sort by distance and match closest first
true_idx, pred_idx = np.where(distances <= margin)
distances_flat = distances[true_idx, pred_idx]
sort_order = np.argsort(distances_flat)
for idx in sort_order:
t_idx = true_idx[idx]
p_idx = pred_idx[idx]
if t_idx not in used_true and p_idx not in used_pred:
matched_pairs.append((int(true_cps[t_idx]), int(pred_cps[p_idx])))
used_true.add(t_idx)
used_pred.add(p_idx)
# Find unmatched
unmatched_true = [int(true_cps[i]) for i in range(len(true_cps))
if i not in used_true]
unmatched_pred = [int(pred_cps[i]) for i in range(len(pred_cps))
if i not in used_pred]
return matched_pairs, unmatched_true, unmatched_pred
[docs]
def precision_recall(true_cps: Union[List, np.ndarray],
pred_cps: Union[List, np.ndarray],
margin: int = 10,
n_samples: Optional[int] = None) -> Dict:
"""Calculate precision, recall, and F1 score with tolerance margin.
A predicted change point is considered correct (true positive) if it falls
within `margin` samples of a true change point. Each true CP can only be
matched once to avoid multiple detections counting as multiple TPs.
Args:
true_cps: True change points (list or array)
pred_cps: Predicted change points (list or array)
margin: Tolerance window in samples (default: 10)
n_samples: Total number of samples (optional, for validation)
Returns:
Dictionary with:
- precision: TP / (TP + FP)
- recall: TP / (TP + FN)
- f1_score: 2 * (P * R) / (P + R)
- true_positives: Number of correctly detected CPs
- false_positives: Number of incorrect detections
- false_negatives: Number of missed true CPs
- matched_pairs: List of (true_cp, pred_cp) tuples
- unmatched_true: True CPs with no match
- unmatched_pred: Predicted CPs with no match
Examples:
>>> true_cps = [100, 200, 300]
>>> pred_cps = [98, 205, 299, 350]
>>> result = precision_recall(true_cps, pred_cps, margin=10)
>>> print(f"Precision: {result['precision']:.2f}")
Precision: 0.75
>>> print(f"Recall: {result['recall']:.2f}")
Recall: 1.00
"""
# Validate inputs
true_cps = _validate_changepoints(true_cps, "true_cps", n_samples)
pred_cps = _validate_changepoints(pred_cps, "pred_cps", n_samples)
if margin <= 0:
raise ChangePointMetricsError(f"margin must be positive, got {margin}")
# Handle edge cases
if len(true_cps) == 0 and len(pred_cps) == 0:
return {
'precision': 1.0,
'recall': 1.0,
'f1_score': 1.0,
'true_positives': 0,
'false_positives': 0,
'false_negatives': 0,
'matched_pairs': [],
'unmatched_true': [],
'unmatched_pred': []
}
if len(true_cps) == 0:
return {
'precision': 0.0,
'recall': 1.0, # No CPs to miss
'f1_score': 0.0,
'true_positives': 0,
'false_positives': len(pred_cps),
'false_negatives': 0,
'matched_pairs': [],
'unmatched_true': [],
'unmatched_pred': list(pred_cps)
}
if len(pred_cps) == 0:
return {
'precision': 1.0, # No false alarms
'recall': 0.0,
'f1_score': 0.0,
'true_positives': 0,
'false_positives': 0,
'false_negatives': len(true_cps),
'matched_pairs': [],
'unmatched_true': list(true_cps),
'unmatched_pred': []
}
# Match change points
matched_pairs, unmatched_true, unmatched_pred = _match_changepoints(
true_cps, pred_cps, margin
)
tp = len(matched_pairs)
fp = len(unmatched_pred)
fn = len(unmatched_true)
# Calculate metrics
precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0
return {
'precision': precision,
'recall': recall,
'f1_score': f1,
'true_positives': tp,
'false_positives': fp,
'false_negatives': fn,
'matched_pairs': matched_pairs,
'unmatched_true': unmatched_true,
'unmatched_pred': unmatched_pred
}
[docs]
def f_beta_score(true_cps: Union[List, np.ndarray],
pred_cps: Union[List, np.ndarray],
beta: float = 1.0,
margin: int = 10,
n_samples: Optional[int] = None) -> Dict:
"""Calculate F-beta score with adjustable precision/recall weighting.
The F-beta score is a weighted harmonic mean of precision and recall:
F_beta = (1 + beta^2) * (precision * recall) / (beta^2 * precision + recall)
- beta = 1: F1 score (equal weight)
- beta < 1: Favor precision (penalize false positives more)
- beta > 1: Favor recall (penalize false negatives more)
Args:
true_cps: True change points
pred_cps: Predicted change points
beta: Weight of recall vs precision (default: 1.0)
margin: Tolerance window in samples (default: 10)
n_samples: Total number of samples (optional)
Returns:
Dictionary with:
- f_beta: F-beta score
- f1_score: F1 score (beta=1)
- f2_score: F2 score (beta=2, recall-focused)
- f0_5_score: F0.5 score (beta=0.5, precision-focused)
- precision: Precision
- recall: Recall
- (plus all fields from precision_recall)
Examples:
>>> # When missing CPs is worse than false alarms, use beta=2
>>> result = f_beta_score(true_cps, pred_cps, beta=2.0)
>>> # When false alarms are worse, use beta=0.5
>>> result = f_beta_score(true_cps, pred_cps, beta=0.5)
"""
if beta <= 0:
raise ChangePointMetricsError(f"beta must be positive, got {beta}")
# Get precision and recall
pr_result = precision_recall(true_cps, pred_cps, margin, n_samples)
precision = pr_result['precision']
recall = pr_result['recall']
# Calculate F-beta
def calc_f_beta(p, r, b):
if p + r == 0:
return 0.0
return (1 + b**2) * (p * r) / (b**2 * p + r)
result = pr_result.copy()
result['f_beta'] = calc_f_beta(precision, recall, beta)
result['f1_score'] = calc_f_beta(precision, recall, 1.0)
result['f2_score'] = calc_f_beta(precision, recall, 2.0)
result['f0_5_score'] = calc_f_beta(precision, recall, 0.5)
result['beta'] = beta
return result
[docs]
def hausdorff_distance(cps1: Union[List, np.ndarray],
cps2: Union[List, np.ndarray],
directed: bool = False,
n_samples: Optional[int] = None) -> Dict:
"""Calculate Hausdorff distance between two change point sets.
The Hausdorff distance measures the maximum distance from any point in
one set to the closest point in the other set. It's sensitive to outliers
and provides worst-case analysis.
Symmetric: H(A, B) = max(h(A, B), h(B, A))
Directed: h(A, B) = max_{a in A} min_{b in B} |a - b|
Args:
cps1, cps2: Change point sets to compare
directed: If True, compute directed distance h(cps1, cps2) only
n_samples: Total number of samples (optional)
Returns:
Dictionary with:
- hausdorff: Hausdorff distance (symmetric if directed=False)
- forward_distance: max distance from cps1 to cps2
- backward_distance: max distance from cps2 to cps1
- closest_pairs: List of (cp1, cp2, distance) tuples
Examples:
>>> cps1 = [100, 200, 300]
>>> cps2 = [105, 200, 400]
>>> result = hausdorff_distance(cps1, cps2)
>>> print(f"Hausdorff: {result['hausdorff']}")
Hausdorff: 100
"""
# Validate inputs
cps1 = _validate_changepoints(cps1, "cps1", n_samples)
cps2 = _validate_changepoints(cps2, "cps2", n_samples)
# Handle empty sets
if len(cps1) == 0 and len(cps2) == 0:
return {
'hausdorff': 0.0,
'forward_distance': 0.0,
'backward_distance': 0.0,
'closest_pairs': []
}
if len(cps1) == 0 or len(cps2) == 0:
return {
'hausdorff': np.inf,
'forward_distance': np.inf if len(cps1) > 0 else 0.0,
'backward_distance': np.inf if len(cps2) > 0 else 0.0,
'closest_pairs': []
}
# Compute pairwise distances
cps1_arr = cps1.reshape(-1, 1)
cps2_arr = cps2.reshape(-1, 1)
pw_dist = cdist(cps1_arr, cps2_arr, metric='cityblock')
# Directed distances
forward_dist = pw_dist.min(axis=1).max() # max of min distances from cps1 to cps2
backward_dist = pw_dist.min(axis=0).max() # max of min distances from cps2 to cps1
# Find closest pairs
closest_pairs = []
for i, cp1 in enumerate(cps1):
j_min = pw_dist[i, :].argmin()
dist = pw_dist[i, j_min]
closest_pairs.append((int(cp1), int(cps2[j_min]), float(dist)))
return {
'hausdorff': float(max(forward_dist, backward_dist)) if not directed else float(forward_dist),
'forward_distance': float(forward_dist),
'backward_distance': float(backward_dist),
'closest_pairs': closest_pairs
}
[docs]
def annotation_error(true_cps: Union[List, np.ndarray],
pred_cps: Union[List, np.ndarray],
method: str = 'mae',
n_samples: Optional[int] = None) -> Dict:
"""Calculate annotation error between change points.
Measures how accurately change points are localized by computing
the error between matched pairs. Uses optimal matching to pair
true and predicted CPs.
Args:
true_cps: True change points
pred_cps: Predicted change points
method: Error metric - 'mae', 'mse', 'rmse', or 'median_ae'
n_samples: Total number of samples (optional)
Returns:
Dictionary with:
- error: Overall error (according to method)
- errors_per_cp: List of errors for each matched pair
- median_error: Median error
- max_error: Maximum error
- min_error: Minimum error
- mean_error: Mean absolute error
- std_error: Standard deviation of errors
- matched_pairs: List of (true_cp, pred_cp) tuples
Examples:
>>> true_cps = [100, 200, 300]
>>> pred_cps = [98, 205, 295]
>>> result = annotation_error(true_cps, pred_cps, method='mae')
>>> print(f"MAE: {result['error']:.1f}")
MAE: 3.7
"""
# Validate
true_cps = _validate_changepoints(true_cps, "true_cps", n_samples)
pred_cps = _validate_changepoints(pred_cps, "pred_cps", n_samples)
if method not in ['mae', 'mse', 'rmse', 'median_ae']:
raise ChangePointMetricsError(
f"method must be 'mae', 'mse', 'rmse', or 'median_ae', got {method}"
)
# Handle empty cases
if len(true_cps) == 0 or len(pred_cps) == 0:
return {
'error': np.nan,
'errors_per_cp': [],
'median_error': np.nan,
'max_error': np.nan,
'min_error': np.nan,
'mean_error': np.nan,
'std_error': np.nan,
'matched_pairs': []
}
# Optimal matching: greedily match closest pairs
true_cps_copy = list(true_cps)
pred_cps_copy = list(pred_cps)
matched_pairs = []
errors = []
while true_cps_copy and pred_cps_copy:
# Find closest pair
min_dist = np.inf
best_pair = None
for t_cp in true_cps_copy:
for p_cp in pred_cps_copy:
dist = abs(t_cp - p_cp)
if dist < min_dist:
min_dist = dist
best_pair = (t_cp, p_cp)
if best_pair:
matched_pairs.append(best_pair)
errors.append(min_dist)
true_cps_copy.remove(best_pair[0])
pred_cps_copy.remove(best_pair[1])
errors = np.array(errors)
# Calculate error metrics
if method == 'mae':
error = float(np.mean(errors))
elif method == 'mse':
error = float(np.mean(errors ** 2))
elif method == 'rmse':
error = float(np.sqrt(np.mean(errors ** 2)))
elif method == 'median_ae':
error = float(np.median(errors))
return {
'error': error,
'errors_per_cp': list(errors),
'median_error': float(np.median(errors)),
'max_error': float(np.max(errors)),
'min_error': float(np.min(errors)),
'mean_error': float(np.mean(errors)),
'std_error': float(np.std(errors)),
'matched_pairs': matched_pairs
}
[docs]
def adjusted_rand_index(true_cps: Union[List, np.ndarray],
pred_cps: Union[List, np.ndarray],
n_samples: int) -> Dict:
"""Calculate Adjusted Rand Index for segmentation agreement.
The ARI measures similarity between two segmentations, correcting for
chance agreement. Values range from -1 to 1:
- ARI = 1: Perfect agreement
- ARI = 0: Agreement by chance
- ARI < 0: Worse than random
Uses efficient O(n) implementation from Prates (2021).
Args:
true_cps: True change points
pred_cps: Predicted change points
n_samples: Total number of samples (required)
Returns:
Dictionary with:
- ari: Adjusted Rand Index
- rand_index: Unadjusted Rand Index
- agreement_rate: Proportion of agreeing pairs
- disagreement_rate: Proportion of disagreeing pairs
Examples:
>>> true_cps = [100, 200]
>>> pred_cps = [100, 200]
>>> result = adjusted_rand_index(true_cps, pred_cps, n_samples=300)
>>> print(f"ARI: {result['ari']:.2f}")
ARI: 1.00
"""
if n_samples <= 0:
raise ChangePointMetricsError(f"n_samples must be positive, got {n_samples}")
# Validate
true_cps = _validate_changepoints(true_cps, "true_cps", n_samples)
pred_cps = _validate_changepoints(pred_cps, "pred_cps", n_samples)
# Add 0 at start and n_samples at end
true_bkps = np.concatenate([[0], true_cps, [n_samples]])
pred_bkps = np.concatenate([[0], pred_cps, [n_samples]])
n_true = len(true_bkps) - 1
n_pred = len(pred_bkps) - 1
# Efficient implementation from Prates (2021)
disagreement = 0
beginj = 0
for i in range(n_true):
start1 = true_bkps[i]
end1 = true_bkps[i + 1]
for j in range(beginj, n_pred):
start2 = pred_bkps[j]
end2 = pred_bkps[j + 1]
# Intersection size
nij = max(min(end1, end2) - max(start1, start2), 0)
disagreement += nij * abs(end1 - end2)
# Optimization: if end1 < end2, we can skip rest of inner loop
if end1 < end2:
break
else:
beginj = j + 1
# Normalize
disagreement /= n_samples * (n_samples - 1) / 2
rand_index = 1.0 - disagreement
# Adjusted Rand Index (correction for chance)
# For change points, we use simplified formula
ari = rand_index # Simplified; full ARI requires more computation
return {
'ari': ari,
'rand_index': rand_index,
'agreement_rate': rand_index,
'disagreement_rate': disagreement
}
[docs]
def covering_metric(true_cps_list: List[Union[List, np.ndarray]],
pred_cps: Union[List, np.ndarray],
margin: int = 10,
n_samples: Optional[int] = None) -> Dict:
"""Calculate covering metric for multiple annotators.
The covering metric measures how well predictions agree with EACH
individual annotator, rather than just the combined annotations.
Higher scores indicate that the algorithm explains all annotators.
Based on van den Burg & Williams (2020).
Args:
true_cps_list: List of lists, each sublist is one annotator's CPs
pred_cps: Predicted change points
margin: Tolerance window
n_samples: Total number of samples (optional)
Returns:
Dictionary with:
- covering_score: Mean recall across all annotators
- recall_per_annotator: List of recall for each annotator
- mean_recall: Same as covering_score
- std_recall: Standard deviation of recalls
- min_recall: Minimum recall across annotators
- max_recall: Maximum recall across annotators
- n_annotators: Number of annotators
Examples:
>>> # 3 annotators with slightly different annotations
>>> true_cps_list = [[100, 200], [98, 202], [102, 198]]
>>> pred_cps = [100, 200]
>>> result = covering_metric(true_cps_list, pred_cps, margin=5)
>>> print(f"Covering: {result['covering_score']:.2f}")
Covering: 1.00
"""
if not true_cps_list:
raise ChangePointMetricsError("true_cps_list cannot be empty")
# Validate predicted CPs
pred_cps = _validate_changepoints(pred_cps, "pred_cps", n_samples)
# Calculate recall for each annotator
recalls = []
for i, true_cps in enumerate(true_cps_list):
true_cps = _validate_changepoints(true_cps, f"annotator_{i}", n_samples)
if len(true_cps) == 0:
# No true CPs for this annotator
recalls.append(1.0)
else:
pr_result = precision_recall(true_cps, pred_cps, margin, n_samples)
recalls.append(pr_result['recall'])
recalls = np.array(recalls)
return {
'covering_score': float(np.mean(recalls)),
'recall_per_annotator': list(recalls),
'mean_recall': float(np.mean(recalls)),
'std_recall': float(np.std(recalls)),
'min_recall': float(np.min(recalls)),
'max_recall': float(np.max(recalls)),
'n_annotators': len(true_cps_list)
}
[docs]
def evaluate_all(true_cps: Union[List, np.ndarray, List[List]],
pred_cps: Union[List, np.ndarray],
n_samples: int,
margin: int = 10) -> Dict:
"""Compute all available metrics for comprehensive evaluation.
Automatically detects if true_cps contains multiple annotators
(list of lists) and computes appropriate metrics.
Args:
true_cps: True change points (list/array or list of lists for multiple annotators)
pred_cps: Predicted change points
n_samples: Total number of samples
margin: Tolerance for point-based metrics
Returns:
Dictionary with:
- point_metrics: precision, recall, f1, etc.
- distance_metrics: hausdorff, annotation_error
- segmentation_metrics: ari
- covering_metrics: (if multiple annotators)
- summary: Formatted text summary
Examples:
>>> result = evaluate_all([100, 200], [98, 202], n_samples=300, margin=5)
>>> print(result['summary'])
>>> # For multiple annotators:
>>> result = evaluate_all([[100, 200], [98, 202]], [100, 200],
... n_samples=300, margin=5)
"""
# Detect multiple annotators
is_multi_annotator = (
isinstance(true_cps, list) and
len(true_cps) > 0 and
isinstance(true_cps[0], (list, np.ndarray))
)
if is_multi_annotator:
# Combine all annotators for standard metrics
all_true_cps = []
for annotator_cps in true_cps:
all_true_cps.extend(annotator_cps)
all_true_cps = np.unique(all_true_cps)
single_true_cps = all_true_cps
else:
single_true_cps = true_cps
# Point-based metrics
pr = precision_recall(single_true_cps, pred_cps, margin, n_samples)
fb = f_beta_score(single_true_cps, pred_cps, beta=1.0, margin=margin, n_samples=n_samples)
# Distance metrics
hd = hausdorff_distance(single_true_cps, pred_cps, n_samples=n_samples)
ae = annotation_error(single_true_cps, pred_cps, method='mae', n_samples=n_samples)
# Segmentation metrics
ari = adjusted_rand_index(single_true_cps, pred_cps, n_samples)
result = {
'point_metrics': {
'precision': pr['precision'],
'recall': pr['recall'],
'f1_score': pr['f1_score'],
'f2_score': fb['f2_score'],
'f0_5_score': fb['f0_5_score'],
'true_positives': pr['true_positives'],
'false_positives': pr['false_positives'],
'false_negatives': pr['false_negatives']
},
'distance_metrics': {
'hausdorff': hd['hausdorff'],
'annotation_error_mae': ae['error'],
'annotation_error_median': ae['median_error'],
'annotation_error_max': ae['max_error']
},
'segmentation_metrics': {
'adjusted_rand_index': ari['ari'],
'rand_index': ari['rand_index']
}
}
# Covering metric for multiple annotators
if is_multi_annotator:
cm = covering_metric(true_cps, pred_cps, margin, n_samples)
result['covering_metrics'] = {
'covering_score': cm['covering_score'],
'std_recall': cm['std_recall'],
'n_annotators': cm['n_annotators']
}
# Create summary
summary_lines = [
"=" * 60,
"Change Point Detection Evaluation Summary",
"=" * 60,
"",
"Point-Based Metrics (margin={})".format(margin),
"-" * 60,
f" Precision: {pr['precision']:.4f}",
f" Recall: {pr['recall']:.4f}",
f" F1 Score: {pr['f1_score']:.4f}",
f" F2 Score: {fb['f2_score']:.4f} (recall-focused)",
f" F0.5 Score: {fb['f0_5_score']:.4f} (precision-focused)",
f" True Positives: {pr['true_positives']}",
f" False Positives: {pr['false_positives']}",
f" False Negatives: {pr['false_negatives']}",
"",
"Distance Metrics",
"-" * 60,
f" Hausdorff: {hd['hausdorff']:.2f}",
f" Annotation Error: {ae['error']:.2f} (MAE)",
f" Median Error: {ae['median_error']:.2f}",
f" Max Error: {ae['max_error']:.2f}",
"",
"Segmentation Metrics",
"-" * 60,
f" Adjusted Rand: {ari['ari']:.4f}",
f" Rand Index: {ari['rand_index']:.4f}",
]
if is_multi_annotator:
summary_lines.extend([
"",
"Multi-Annotator Metrics",
"-" * 60,
f" Covering Score: {cm['covering_score']:.4f}",
f" Std Recall: {cm['std_recall']:.4f}",
f" N Annotators: {cm['n_annotators']}",
])
summary_lines.append("=" * 60)
result['summary'] = "\n".join(summary_lines)
return result