Source code for fastcpd.datasets

"""Synthetic data generation for change point detection.

This module provides functions to generate synthetic time series data with
various types of changes, including:

- Multiple model types (mean, variance, regression, GLM, LASSO, ARMA, GARCH)
- Rich metadata (SNR, difficulty scores, true parameters)
- Realistic parameter generation
- Multiple change patterns (jump, drift, coefficient changes)
- Reproducible with seed control

All functions return dictionaries with 'data', 'changepoints', and 'metadata'
for comprehensive analysis and benchmarking.
"""

import numpy as np
from typing import Union, List, Dict, Optional, Tuple


def _draw_changepoints(n_samples: int, n_changepoints: int,
                      min_segment_length: Optional[int] = None,
                      seed: Optional[int] = None) -> np.ndarray:
    """Draw random change point locations using Dirichlet distribution.

    Args:
        n_samples: Total number of samples
        n_changepoints: Number of change points to generate
        min_segment_length: Minimum samples per segment (default: n_samples//(n_changepoints+1)//2)
        seed: Random seed

    Returns:
        Sorted array of change point indices
    """
    rng = np.random.default_rng(seed=seed)

    if min_segment_length is None:
        min_segment_length = max(5, n_samples // (n_changepoints + 1) // 2)

    # Use Dirichlet to get segment lengths
    alpha = np.ones(n_changepoints + 1) / (n_changepoints + 1) * 2000
    segment_proportions = rng.dirichlet(alpha)
    segment_lengths = (segment_proportions * n_samples).astype(int)

    # Ensure minimum segment length
    while np.any(segment_lengths < min_segment_length):
        too_small = segment_lengths < min_segment_length
        deficit = min_segment_length - segment_lengths[too_small]
        segment_lengths[too_small] = min_segment_length

        # Take from largest segments
        surplus_idx = np.argmax(segment_lengths)
        segment_lengths[surplus_idx] -= deficit.sum()

    # Adjust last segment to match exactly n_samples
    segment_lengths[-1] = n_samples - segment_lengths[:-1].sum()

    # Convert to change points
    changepoints = np.cumsum(segment_lengths)[:-1]

    return changepoints


[docs] def make_mean_change(n_samples: int = 500, n_changepoints: int = 3, n_dim: int = 1, mean_deltas: Optional[List[float]] = None, noise_std: float = 1.0, change_type: str = 'jump', seed: Optional[int] = None) -> Dict: """Generate data with mean changes. Args: n_samples: Total number of samples n_changepoints: Number of change points n_dim: Data dimensionality mean_deltas: List of mean shifts for each segment (auto-generated if None) noise_std: Noise standard deviation change_type: 'jump' (step change) or 'drift' (gradual) seed: Random seed Returns: Dictionary with: - data: array of shape (n_samples, n_dim) - changepoints: array of CP indices - true_means: list of segment means - metadata: dict with SNR, deltas, difficulty, etc. Examples: >>> data_dict = make_mean_change(n_samples=500, n_changepoints=3) >>> data = data_dict['data'] >>> cps = data_dict['changepoints'] >>> print(f"SNR: {data_dict['metadata']['snr']:.2f}") """ rng = np.random.default_rng(seed=seed) # Generate change points changepoints = _draw_changepoints(n_samples, n_changepoints, seed=seed) segment_boundaries = np.concatenate([[0], changepoints, [n_samples]]) n_segments = n_changepoints + 1 # Generate mean deltas if not provided if mean_deltas is None: # Realistic deltas: between 1 and 5 std devs mean_deltas = rng.uniform(1.0, 5.0, size=n_dim) * noise_std # Random sign mean_deltas *= rng.choice([-1, 1], size=n_dim) mean_deltas = np.atleast_1d(mean_deltas) if len(mean_deltas) == 1 and n_dim > 1: mean_deltas = np.tile(mean_deltas, n_dim) # Generate segment means true_means = [] current_mean = np.zeros(n_dim) true_means.append(current_mean.copy()) for _ in range(n_segments - 1): # Jump or accumulate jump = rng.uniform(0.5, 1.5, n_dim) * mean_deltas jump *= rng.choice([-1, 1], n_dim) current_mean += jump true_means.append(current_mean.copy()) # Generate data data = np.zeros((n_samples, n_dim)) for i, (start, end) in enumerate(zip(segment_boundaries[:-1], segment_boundaries[1:])): segment_length = end - start if change_type == 'jump': # Step change segment_mean = true_means[i] data[start:end] = segment_mean + rng.normal(0, noise_std, (segment_length, n_dim)) elif change_type == 'drift': # Gradual drift if i < n_segments - 1: # Drift from current to next mean mean_start = true_means[i] mean_end = true_means[i + 1] drift = np.linspace(mean_start, mean_end, segment_length) data[start:end] = drift + rng.normal(0, noise_std, (segment_length, n_dim)) else: # Last segment stays constant data[start:end] = true_means[i] + rng.normal(0, noise_std, (segment_length, n_dim)) else: raise ValueError(f"change_type must be 'jump' or 'drift', got {change_type}") # Calculate metadata signal_power = np.var([m for means in true_means for m in means]) noise_power = noise_std ** 2 snr = 10 * np.log10(signal_power / noise_power) if noise_power > 0 else np.inf # Difficulty: lower SNR = harder difficulty = 1.0 / (1.0 + snr / 10) # 0 = easy (high SNR), 1 = hard (low SNR) metadata = { 'mean_deltas': mean_deltas.tolist(), 'segment_lengths': [int(end - start) for start, end in zip(segment_boundaries[:-1], segment_boundaries[1:])], 'snr_db': float(snr), 'difficulty': float(difficulty), 'change_type': change_type, 'noise_std': noise_std } return { 'data': data if n_dim > 1 else data.flatten(), 'changepoints': changepoints.tolist(), 'true_means': [m.tolist() if hasattr(m, 'tolist') else m for m in true_means], 'metadata': metadata }
[docs] def make_variance_change(n_samples: int = 500, n_changepoints: int = 3, n_dim: int = 1, variance_ratios: Optional[List[float]] = None, base_var: float = 1.0, change_type: str = 'multiplicative', seed: Optional[int] = None) -> Dict: """Generate data with variance changes. Args: n_samples: Total number of samples n_changepoints: Number of change points n_dim: Data dimensionality variance_ratios: List of variance multipliers (auto-generated if None) base_var: Baseline variance change_type: 'multiplicative' or 'additive' seed: Random seed Returns: Dictionary with: - data: array of shape (n_samples, n_dim) - changepoints: array of CP indices - true_variances: list of segment variances - metadata: dict with variance_ratios, kurtosis, etc. Examples: >>> data_dict = make_variance_change(n_samples=500, n_changepoints=2) >>> print(data_dict['metadata']['variance_ratios']) """ rng = np.random.default_rng(seed=seed) # Generate change points changepoints = _draw_changepoints(n_samples, n_changepoints, seed=seed) segment_boundaries = np.concatenate([[0], changepoints, [n_samples]]) n_segments = n_changepoints + 1 # Generate variance ratios if not provided if variance_ratios is None: # Ratios between 0.5 and 4.0 for noticeable changes variance_ratios = rng.uniform(0.5, 4.0, n_segments) variance_ratios[0] = 1.0 # Start with base variance variance_ratios = np.atleast_1d(variance_ratios) # Generate segment variances if change_type == 'multiplicative': true_variances = [base_var * ratio for ratio in variance_ratios] elif change_type == 'additive': true_variances = [base_var + ratio for ratio in variance_ratios] else: raise ValueError(f"change_type must be 'multiplicative' or 'additive', got {change_type}") # Generate data data = np.zeros((n_samples, n_dim)) kurtosis_per_segment = [] for i, (start, end) in enumerate(zip(segment_boundaries[:-1], segment_boundaries[1:])): segment_length = end - start std = np.sqrt(true_variances[i]) segment_data = rng.normal(0, std, (segment_length, n_dim)) data[start:end] = segment_data # Calculate kurtosis (for metadata) from scipy import stats kurt = stats.kurtosis(segment_data.flatten()) kurtosis_per_segment.append(float(kurt)) metadata = { 'variance_ratios': variance_ratios.tolist(), 'true_variances': [float(v) for v in true_variances], 'kurtosis_per_segment': kurtosis_per_segment, 'segment_lengths': [int(end - start) for start, end in zip(segment_boundaries[:-1], segment_boundaries[1:])], 'change_type': change_type, 'base_var': base_var } return { 'data': data if n_dim > 1 else data.flatten(), 'changepoints': changepoints.tolist(), 'true_variances': [float(v) for v in true_variances], 'metadata': metadata }
[docs] def make_regression_change(n_samples: int = 500, n_changepoints: int = 3, n_features: int = 3, coef_changes: Union[str, List[np.ndarray]] = 'random', noise_std: float = 0.5, correlation: float = 0.0, seed: Optional[int] = None) -> Dict: """Generate linear regression data with coefficient changes. Args: n_samples: Total number of samples n_changepoints: Number of change points n_features: Number of covariates coef_changes: 'random', 'sign_flip', 'magnitude', or list of coefficient arrays noise_std: Error term std deviation correlation: Covariate correlation (0 to 0.9) seed: Random seed Returns: Dictionary with: - data: array (n_samples, n_features+1) where [:, 0] is y and [:, 1:] is X - changepoints: array of CP indices - true_coefs: array (n_segments, n_features) of coefficients per segment - X: covariate matrix (n_samples, n_features) - y: response vector (n_samples,) - metadata: dict with R², condition number, effect size Examples: >>> data_dict = make_regression_change(n_samples=300, n_changepoints=2, n_features=3) >>> X = data_dict['X'] >>> y = data_dict['y'] >>> print(data_dict['metadata']['r_squared_per_segment']) """ rng = np.random.default_rng(seed=seed) # Generate change points changepoints = _draw_changepoints(n_samples, n_changepoints, seed=seed) segment_boundaries = np.concatenate([[0], changepoints, [n_samples]]) n_segments = n_changepoints + 1 # Generate covariates with optional correlation if correlation > 0: # Generate correlated covariates mean = np.zeros(n_features) cov = np.eye(n_features) * (1 - correlation) + correlation X = rng.multivariate_normal(mean, cov, size=n_samples) else: X = rng.normal(0, 1, (n_samples, n_features)) # Generate coefficients if isinstance(coef_changes, str): if coef_changes == 'random': # Random coefficients in range [-3, 3] true_coefs = rng.uniform(-3, 3, (n_segments, n_features)) elif coef_changes == 'sign_flip': # Start with random, then flip signs coef_base = rng.uniform(1, 3, n_features) true_coefs = [] for i in range(n_segments): if i % 2 == 0: true_coefs.append(coef_base) else: true_coefs.append(-coef_base) true_coefs = np.array(true_coefs) elif coef_changes == 'magnitude': # Same signs, different magnitudes signs = rng.choice([-1, 1], n_features) true_coefs = [] for i in range(n_segments): magnitudes = rng.uniform(0.5, 3.0, n_features) true_coefs.append(signs * magnitudes) true_coefs = np.array(true_coefs) else: raise ValueError(f"coef_changes must be 'random', 'sign_flip', 'magnitude', or list of arrays") else: true_coefs = np.array(coef_changes) # Generate response y = np.zeros(n_samples) r_squared_per_segment = [] for i, (start, end) in enumerate(zip(segment_boundaries[:-1], segment_boundaries[1:])): X_seg = X[start:end] y_pred = X_seg @ true_coefs[i] noise = rng.normal(0, noise_std, end - start) y_seg = y_pred + noise y[start:end] = y_seg # Calculate R² ss_res = np.sum(noise ** 2) ss_tot = np.sum((y_seg - y_seg.mean()) ** 2) r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0.0 r_squared_per_segment.append(float(r2)) # Calculate condition number condition_number = np.linalg.cond(X) # Calculate effect size (how different are coefficients between segments) if n_segments > 1: coef_diffs = np.linalg.norm(np.diff(true_coefs, axis=0), axis=1) effect_size = float(np.mean(coef_diffs)) else: effect_size = 0.0 metadata = { 'r_squared_per_segment': r_squared_per_segment, 'condition_number': float(condition_number), 'effect_size': effect_size, 'correlation': correlation, 'noise_std': noise_std, 'n_features': n_features } # Combine y and X as in fastcpd convention data = np.column_stack([y, X]) return { 'data': data, 'changepoints': changepoints.tolist(), 'true_coefs': true_coefs.tolist(), 'X': X, 'y': y, 'metadata': metadata }
[docs] def make_arma_change(n_samples: int = 500, n_changepoints: int = 3, orders: Optional[List[Tuple[int, int]]] = None, sigma_change: bool = False, innovation: str = 'normal', seed: Optional[int] = None) -> Dict: """Generate ARMA time series with parameter changes. Args: n_samples: Total number of samples n_changepoints: Number of change points orders: List of (p,q) tuples for each segment (auto-generated if None) sigma_change: If True, innovation variance also changes innovation: 'normal', 't', or 'skew_normal' seed: Random seed Returns: Dictionary with: - data: array (n_samples,) - changepoints: array of CP indices - true_params: list of dicts with 'ar', 'ma', 'sigma' for each segment - metadata: dict with stationarity checks, ACF, PACF Examples: >>> data_dict = make_arma_change(n_samples=500, orders=[(1,1), (2,0)]) >>> print(data_dict['metadata']['is_stationary']) """ rng = np.random.default_rng(seed=seed) # Generate change points changepoints = _draw_changepoints(n_samples, n_changepoints, seed=seed) segment_boundaries = np.concatenate([[0], changepoints, [n_samples]]) n_segments = n_changepoints + 1 # Generate ARMA orders if not provided if orders is None: max_p, max_q = 2, 2 orders = [(rng.integers(0, max_p + 1), rng.integers(0, max_q + 1)) for _ in range(n_segments)] if len(orders) != n_segments: orders = list(orders) * (n_segments // len(orders) + 1) orders = orders[:n_segments] # Generate ARMA parameters true_params = [] data = np.zeros(n_samples) from statsmodels.tsa.arima_process import arma_generate_sample for i, (start, end) in enumerate(zip(segment_boundaries[:-1], segment_boundaries[1:])): p, q = orders[i] # Generate stationary AR coefficients if p > 0: # Ensure stationarity: roots outside unit circle ar_coefs = rng.uniform(-0.8, 0.8, p) / (p + 1) else: ar_coefs = np.array([]) # Generate invertible MA coefficients if q > 0: ma_coefs = rng.uniform(-0.8, 0.8, q) / (q + 1) else: ma_coefs = np.array([]) # Variance if sigma_change: sigma = rng.uniform(0.5, 2.0) else: sigma = 1.0 # Generate innovations segment_length = end - start if innovation == 'normal': innovations = rng.normal(0, sigma, segment_length) elif innovation == 't': innovations = rng.standard_t(5, segment_length) * sigma elif innovation == 'skew_normal': # Approximate skew normal innovations = rng.gamma(2, sigma, segment_length) - 2 * sigma else: raise ValueError(f"innovation must be 'normal', 't', or 'skew_normal'") # Generate ARMA series using statsmodels # Note: arma_generate_sample uses [1, -ar1, -ar2, ...] convention ar_params = np.r_[1, -ar_coefs] if p > 0 else np.array([1]) ma_params = np.r_[1, ma_coefs] if q > 0 else np.array([1]) try: segment_data = arma_generate_sample(ar_params, ma_params, segment_length, sigma=sigma, distrvs=lambda size: innovations[:size]) data[start:end] = segment_data except: # Fallback: just use innovations if generation fails data[start:end] = innovations true_params.append({ 'ar': ar_coefs.tolist(), 'ma': ma_coefs.tolist(), 'sigma': float(sigma) }) # Metadata: check stationarity is_stationary = [] is_invertible = [] for i, params in enumerate(true_params): p, q = orders[i] # Check stationarity (AR roots) if p > 0: ar_poly = np.r_[1, -np.array(params['ar'])] roots = np.roots(ar_poly) is_stat = np.all(np.abs(roots) > 1.0) else: is_stat = True is_stationary.append(is_stat) # Check invertibility (MA roots) if q > 0: ma_poly = np.r_[1, np.array(params['ma'])] roots = np.roots(ma_poly) is_inv = np.all(np.abs(roots) > 1.0) else: is_inv = True is_invertible.append(is_inv) metadata = { 'orders': orders, 'is_stationary': is_stationary, 'is_invertible': is_invertible, 'innovation_type': innovation, 'segment_lengths': [int(end - start) for start, end in zip(segment_boundaries[:-1], segment_boundaries[1:])] } return { 'data': data, 'changepoints': changepoints.tolist(), 'true_params': true_params, 'metadata': metadata }
[docs] def make_glm_change(n_samples: int = 500, n_changepoints: int = 3, n_features: int = 3, family: str = 'binomial', coef_changes: Union[str, List[np.ndarray]] = 'random', trials: Optional[int] = None, correlation: float = 0.0, seed: Optional[int] = None) -> Dict: """Generate GLM data with coefficient changes. Args: n_samples: Total number of samples n_changepoints: Number of change points n_features: Number of covariates family: 'binomial' or 'poisson' coef_changes: 'random', 'sign_flip', or list of coefficient arrays trials: Number of trials for binomial (default: 1 for logistic regression) correlation: Covariate correlation (0 to 0.9) seed: Random seed Returns: Dictionary with: - data: array (n_samples, n_features+1) where [:, 0] is y and [:, 1:] is X - changepoints: array of CP indices - true_coefs: array (n_segments, n_features) of coefficients per segment - X: covariate matrix (n_samples, n_features) - y: response vector (n_samples,) - metadata: dict with AUC (binomial), overdispersion (poisson), etc. Examples: >>> data_dict = make_glm_change(n_samples=400, family='binomial', n_features=3) >>> X = data_dict['X'] >>> y = data_dict['y'] >>> print(data_dict['metadata']['separation_per_segment']) """ rng = np.random.default_rng(seed=seed) if family not in ['binomial', 'poisson']: raise ValueError(f"family must be 'binomial' or 'poisson', got {family}") # Generate change points changepoints = _draw_changepoints(n_samples, n_changepoints, seed=seed) segment_boundaries = np.concatenate([[0], changepoints, [n_samples]]) n_segments = n_changepoints + 1 # Generate covariates if correlation > 0: mean = np.zeros(n_features) cov = np.eye(n_features) * (1 - correlation) + correlation X = rng.multivariate_normal(mean, cov, size=n_samples) else: X = rng.normal(0, 1, (n_samples, n_features)) # Generate coefficients if isinstance(coef_changes, str): if coef_changes == 'random': true_coefs = rng.uniform(-2, 2, (n_segments, n_features)) elif coef_changes == 'sign_flip': coef_base = rng.uniform(0.5, 2.0, n_features) true_coefs = [] for i in range(n_segments): if i % 2 == 0: true_coefs.append(coef_base) else: true_coefs.append(-coef_base) true_coefs = np.array(true_coefs) else: raise ValueError(f"coef_changes must be 'random', 'sign_flip', or list of arrays") else: true_coefs = np.array(coef_changes) # Generate response y = np.zeros(n_samples, dtype=int if family == 'binomial' else float) if family == 'binomial': if trials is None: trials = 1 # Logistic regression separation_per_segment = [] for i, (start, end) in enumerate(zip(segment_boundaries[:-1], segment_boundaries[1:])): X_seg = X[start:end] eta = X_seg @ true_coefs[i] prob = 1 / (1 + np.exp(-eta)) # Generate binomial outcomes if trials == 1: y_seg = rng.binomial(1, prob) else: y_seg = rng.binomial(trials, prob) y[start:end] = y_seg # Calculate separation (ROC AUC for trials=1) if trials == 1: from sklearn.metrics import roc_auc_score try: auc = roc_auc_score(y_seg, prob) except: auc = 0.5 separation_per_segment.append(float(auc)) else: separation_per_segment.append(None) metadata = { 'family': 'binomial', 'trials': trials, 'separation_per_segment': separation_per_segment, 'correlation': correlation, 'n_features': n_features } elif family == 'poisson': overdispersion_per_segment = [] for i, (start, end) in enumerate(zip(segment_boundaries[:-1], segment_boundaries[1:])): X_seg = X[start:end] eta = X_seg @ true_coefs[i] lambda_mean = np.exp(eta) # Generate Poisson outcomes y_seg = rng.poisson(lambda_mean) y[start:end] = y_seg # Calculate overdispersion (variance / mean) overdispersion = float(np.var(y_seg) / (np.mean(y_seg) + 1e-10)) overdispersion_per_segment.append(overdispersion) metadata = { 'family': 'poisson', 'overdispersion_per_segment': overdispersion_per_segment, 'correlation': correlation, 'n_features': n_features } # Combine y and X data = np.column_stack([y, X]) return { 'data': data, 'changepoints': changepoints.tolist(), 'true_coefs': true_coefs.tolist(), 'X': X, 'y': y, 'metadata': metadata }
[docs] def make_garch_change(n_samples: int = 500, n_changepoints: int = 3, orders: Optional[List[Tuple[int, int]]] = None, volatility_regimes: Optional[List[str]] = None, seed: Optional[int] = None) -> Dict: """Generate GARCH time series with volatility regime changes. Args: n_samples: Total number of samples n_changepoints: Number of change points orders: List of (p,q) tuples for each segment (auto-generated if None) volatility_regimes: List of 'low', 'medium', 'high' for each segment seed: Random seed Returns: Dictionary with: - data: array (n_samples,) of returns - changepoints: array of CP indices - true_params: list of dicts with 'omega', 'alpha', 'beta' for each segment - volatility: array (n_samples,) of conditional volatility - metadata: dict with volatility ratios, kurtosis Examples: >>> data_dict = make_garch_change(n_samples=600, n_changepoints=2) >>> returns = data_dict['data'] >>> vol = data_dict['volatility'] >>> print(data_dict['metadata']['avg_volatility_per_segment']) """ rng = np.random.default_rng(seed=seed) # Generate change points changepoints = _draw_changepoints(n_samples, n_changepoints, seed=seed) segment_boundaries = np.concatenate([[0], changepoints, [n_samples]]) n_segments = n_changepoints + 1 # Generate GARCH orders if not provided if orders is None: # Default to GARCH(1,1) orders = [(1, 1)] * n_segments if len(orders) != n_segments: orders = list(orders) * (n_segments // len(orders) + 1) orders = orders[:n_segments] # Generate volatility regimes if volatility_regimes is None: regime_choices = ['low', 'medium', 'high'] volatility_regimes = rng.choice(regime_choices, n_segments, replace=True).tolist() regime_params = { 'low': {'omega': 0.01, 'alpha_scale': 0.05, 'beta_scale': 0.90}, 'medium': {'omega': 0.05, 'alpha_scale': 0.10, 'beta_scale': 0.85}, 'high': {'omega': 0.10, 'alpha_scale': 0.20, 'beta_scale': 0.75} } # Generate GARCH parameters and data data = np.zeros(n_samples) volatility = np.zeros(n_samples) true_params = [] avg_vol_per_segment = [] for i, (start, end) in enumerate(zip(segment_boundaries[:-1], segment_boundaries[1:])): p, q = orders[i] regime = volatility_regimes[i] regime_param = regime_params[regime] # Generate GARCH parameters omega = regime_param['omega'] if p > 0: alpha = rng.uniform(0.5, 1.5, p) * regime_param['alpha_scale'] else: alpha = np.array([]) if q > 0: beta = rng.uniform(0.5, 1.5, q) * regime_param['beta_scale'] else: beta = np.array([]) # Ensure persistence < 1 for stationarity persistence = np.sum(alpha) + np.sum(beta) if persistence >= 0.98: scale_factor = 0.95 / (persistence + 1e-10) alpha = alpha * scale_factor beta = beta * scale_factor persistence = np.sum(alpha) + np.sum(beta) # Simulate GARCH process segment_length = end - start seg_vol = np.zeros(segment_length) seg_returns = np.zeros(segment_length) # Initialize seg_vol[0] = np.sqrt(omega / (1 - persistence + 1e-10)) for t in range(segment_length): if t > 0: # GARCH variance equation variance = omega # ARCH terms for j in range(min(p, t)): variance += alpha[j] * seg_returns[t-1-j]**2 # GARCH terms for j in range(min(q, t)): variance += beta[j] * seg_vol[t-1-j]**2 seg_vol[t] = np.sqrt(max(variance, 1e-10)) # Generate return seg_returns[t] = seg_vol[t] * rng.standard_normal() data[start:end] = seg_returns volatility[start:end] = seg_vol true_params.append({ 'omega': float(omega), 'alpha': alpha.tolist(), 'beta': beta.tolist(), 'regime': regime }) avg_vol_per_segment.append(float(np.mean(seg_vol))) # Calculate metadata from scipy import stats kurtosis_per_segment = [] for start, end in zip(segment_boundaries[:-1], segment_boundaries[1:]): kurt = stats.kurtosis(data[start:end]) kurtosis_per_segment.append(float(kurt)) # Volatility ratios vol_ratios = [avg_vol_per_segment[i] / avg_vol_per_segment[0] for i in range(n_segments)] metadata = { 'orders': orders, 'volatility_regimes': volatility_regimes, 'avg_volatility_per_segment': avg_vol_per_segment, 'volatility_ratios': vol_ratios, 'kurtosis_per_segment': kurtosis_per_segment, 'segment_lengths': [int(end - start) for start, end in zip(segment_boundaries[:-1], segment_boundaries[1:])] } return { 'data': data, 'changepoints': changepoints.tolist(), 'true_params': true_params, 'volatility': volatility, 'metadata': metadata }
[docs] def add_annotation_noise(true_changepoints: Union[List, np.ndarray], n_annotators: int = 5, noise_std: float = 5.0, agreement_rate: float = 0.8, seed: Optional[int] = None) -> List[List[int]]: """Simulate multiple human annotators with varying agreement. Useful for testing covering metric and multi-annotator scenarios. Args: true_changepoints: Ground truth change points n_annotators: Number of annotators to simulate noise_std: Std of Gaussian noise added to CP locations agreement_rate: Probability each annotator includes each CP seed: Random seed Returns: List of lists, each sublist is one annotator's change points Examples: >>> true_cps = [100, 200, 300] >>> annotators = add_annotation_noise(true_cps, n_annotators=5) >>> print(f"Annotator 1: {annotators[0]}") >>> print(f"Annotator 2: {annotators[1]}") """ rng = np.random.default_rng(seed=seed) true_cps = np.atleast_1d(true_changepoints) annotator_cps = [] for _ in range(n_annotators): cps = [] for true_cp in true_cps: # Decide if annotator includes this CP if rng.random() < agreement_rate: # Add noise to location noisy_cp = true_cp + rng.normal(0, noise_std) noisy_cp = int(np.clip(noisy_cp, 0, np.inf)) cps.append(noisy_cp) # Sort and remove duplicates cps = sorted(set(cps)) annotator_cps.append(cps) return annotator_cps