Source code for fastcpd.segmentation

"""Convenience functions for common change point detection scenarios."""

from typing import Optional, Union
import numpy as np
from fastcpd.fastcpd import fastcpd, FastcpdResult
from typing import Optional, Union, Tuple


[docs] def mean( data: Union[np.ndarray, list], beta: Union[str, float] = "MBIC", **kwargs ) -> FastcpdResult: """Detect mean changes in univariate or multivariate data. Parameters: data: Input data of shape (n,) for univariate or (n, d) for multivariate beta: Penalty for number of change points **kwargs: Additional arguments passed to fastcpd() Returns: FastcpdResult: Change point detection results Examples: >>> import numpy as np >>> from fastcpd.segmentation import mean >>> >>> # Univariate mean change >>> data = np.concatenate([np.random.normal(0, 1, 300), ... np.random.normal(5, 1, 400)]) >>> result = mean(data) >>> print(result.cp_set) >>> # Multivariate mean change >>> data = np.concatenate([ ... np.random.multivariate_normal([0, 0], np.eye(2), 300), ... np.random.multivariate_normal([5, 5], np.eye(2), 400) ... ]) >>> result = mean(data) """ return fastcpd(data, family="mean", beta=beta, **kwargs)
[docs] def variance( data: Union[np.ndarray, list], beta: Union[str, float] = "MBIC", **kwargs ) -> FastcpdResult: """Detect variance changes in data. Parameters: data: Input data beta: Penalty for number of change points **kwargs: Additional arguments passed to fastcpd() Returns: FastcpdResult: Change point detection results """ return fastcpd(data, family="variance", beta=beta, **kwargs)
[docs] def meanvariance( data: Union[np.ndarray, list], beta: Union[str, float] = "MBIC", **kwargs ) -> FastcpdResult: """Detect mean and/or variance changes in data. Parameters: data: Input data beta: Penalty for number of change points **kwargs: Additional arguments passed to fastcpd() Returns: FastcpdResult: Change point detection results """ return fastcpd(data, family="meanvariance", beta=beta, **kwargs)
[docs] def ar( data: Union[np.ndarray, list], p: int = 1, beta: Union[str, float] = "MBIC", **kwargs ) -> FastcpdResult: """Detect changes in AR(p) model parameters. Parameters: data: Input time series data p: Order of AR model beta: Penalty for number of change points **kwargs: Additional arguments passed to fastcpd() Returns: FastcpdResult: Change point detection results Examples: >>> import numpy as np >>> from fastcpd.segmentation import ar >>> >>> # Simulate AR(2) data with change >>> np.random.seed(42) >>> n = 500 >>> data1 = np.zeros(n // 2) >>> for i in range(2, n // 2): ... data1[i] = 0.6 * data1[i-1] - 0.3 * data1[i-2] + np.random.normal() >>> data2 = np.zeros(n // 2) >>> for i in range(2, n // 2): ... data2[i] = -0.4 * data2[i-1] + 0.5 * data2[i-2] + np.random.normal() >>> data = np.concatenate([data1, data2]) >>> result = ar(data, p=2) >>> print(result.cp_set) """ return fastcpd(data, family="ar", p=p, beta=beta, order=[p], **kwargs)
[docs] def arma( data: Union[np.ndarray, list], p: int = 1, q: int = 1, beta: Union[str, float] = "MBIC", **kwargs ) -> FastcpdResult: """Detect changes in ARMA(p,q) model parameters. Parameters: data: Input time series data p: AR order q: MA order beta: Penalty for number of change points **kwargs: Additional arguments passed to fastcpd() Returns: FastcpdResult: Change point detection results """ return fastcpd(data, family="arma", beta=beta, order=[p, q], **kwargs)
[docs] def garch( data: Union[np.ndarray, list], p: int = 1, q: int = 1, beta: Union[str, float] = "MBIC", **kwargs ) -> FastcpdResult: """Detect changes in GARCH(p,q) model parameters. Parameters: data: Input time series data p: GARCH order q: ARCH order beta: Penalty for number of change points **kwargs: Additional arguments passed to fastcpd() Returns: FastcpdResult: Change point detection results """ return fastcpd(data, family="garch", beta=beta, order=[p, q], **kwargs)
[docs] def var( data: Union[np.ndarray, list], p: int = 1, beta: Union[str, float] = "MBIC", **kwargs ) -> FastcpdResult: """Detect changes in VAR(p) model parameters. Parameters: data: Input multivariate time series data of shape (n, d) p: Order of VAR model beta: Penalty for number of change points **kwargs: Additional arguments passed to fastcpd() Returns: FastcpdResult: Change point detection results """ return fastcpd(data, family="var", p=p, beta=beta, order=[p], **kwargs)
[docs] def linear_regression( data: Union[np.ndarray, list], beta: Union[str, float] = "MBIC", **kwargs ) -> FastcpdResult: """Detect changes in linear regression parameters. The first column of data is treated as the response variable, and the remaining columns are treated as predictors. Parameters: data: Input data of shape (n, d+1) where first column is response beta: Penalty for number of change points **kwargs: Additional arguments passed to fastcpd() Returns: FastcpdResult: Change point detection results Examples: >>> import numpy as np >>> from fastcpd.segmentation import linear_regression >>> >>> # Simulate linear regression with change >>> n = 500 >>> X = np.random.randn(n, 2) >>> y1 = 2 * X[:n//2, 0] + 3 * X[:n//2, 1] + np.random.randn(n//2) >>> y2 = -1 * X[n//2:, 0] + 5 * X[n//2:, 1] + np.random.randn(n//2) >>> y = np.concatenate([y1, y2]) >>> data = np.column_stack([y, X]) >>> result = linear_regression(data) >>> print(result.cp_set) """ return fastcpd(data, family="lm", beta=beta, **kwargs)
[docs] def logistic_regression( data: Union[np.ndarray, list], beta: Union[str, float] = "MBIC", **kwargs ) -> FastcpdResult: """Detect changes in logistic regression parameters. Uses scikit-learn's highly optimized LogisticRegression (Cython/C implementation). The first column of data is the binary response (0/1), remaining columns are predictors. Parameters: data: Input data of shape (n, d+1) where first column is binary response beta: Penalty for number of change points **kwargs: Additional arguments passed to fastcpd() Returns: FastcpdResult: Change point detection results Examples: >>> import numpy as np >>> from fastcpd.segmentation import logistic_regression >>> >>> # Simulate logistic regression with change >>> n = 500 >>> X = np.random.randn(n, 2) >>> # First segment: strong positive effect >>> prob1 = 1 / (1 + np.exp(-(2*X[:n//2, 0] + 3*X[:n//2, 1]))) >>> y1 = (np.random.rand(n//2) < prob1).astype(float) >>> # Second segment: negative effect >>> prob2 = 1 / (1 + np.exp(-(-1*X[n//2:, 0] + 2*X[n//2:, 1]))) >>> y2 = (np.random.rand(n//2) < prob2).astype(float) >>> y = np.concatenate([y1, y2]) >>> data = np.column_stack([y, X]) >>> result = logistic_regression(data) >>> print(result.cp_set) """ return fastcpd(data, family="binomial", beta=beta, **kwargs)
[docs] def poisson_regression( data: Union[np.ndarray, list], beta: Union[str, float] = "MBIC", **kwargs ) -> FastcpdResult: """Detect changes in Poisson regression parameters. Uses scikit-learn's PoissonRegressor with optimized LBFGS solver. The first column of data is the count response, remaining columns are predictors. Parameters: data: Input data of shape (n, d+1) where first column is count response beta: Penalty for number of change points **kwargs: Additional arguments passed to fastcpd() Returns: FastcpdResult: Change point detection results Examples: >>> import numpy as np >>> from fastcpd.segmentation import poisson_regression >>> >>> # Simulate Poisson regression with change >>> n = 500 >>> X = np.random.randn(n, 2) >>> # First segment >>> lambda1 = np.exp(0.5*X[:n//2, 0] + 0.8*X[:n//2, 1]) >>> y1 = np.random.poisson(lambda1) >>> # Second segment >>> lambda2 = np.exp(-0.3*X[n//2:, 0] + 1.2*X[n//2:, 1]) >>> y2 = np.random.poisson(lambda2) >>> y = np.concatenate([y1, y2]) >>> data = np.column_stack([y, X]) >>> result = poisson_regression(data) >>> print(result.cp_set) """ return fastcpd(data, family="poisson", beta=beta, **kwargs)
[docs] def lasso( data: Union[np.ndarray, list], beta: Union[str, float] = "MBIC", alpha: float = 1.0, cv: bool = False, **kwargs ) -> FastcpdResult: """Detect changes in LASSO regression parameters. Uses scikit-learn's highly optimized coordinate descent algorithm. The first column of data is the response, remaining columns are predictors. Parameters: data: Input data of shape (n, d+1) where first column is response beta: Penalty for number of change points alpha: L1 regularization strength (lambda). Ignored if cv=True. cv: If True, use cross-validation to select alpha (slower but automatic) **kwargs: Additional arguments passed to fastcpd() Returns: FastcpdResult: Change point detection results Examples: >>> import numpy as np >>> from fastcpd.segmentation import lasso >>> >>> # Simulate sparse regression with change >>> n = 500 >>> p = 20 >>> X = np.random.randn(n, p) >>> # First segment: only first 3 features matter >>> y1 = 2*X[:n//2, 0] + 3*X[:n//2, 1] - 1.5*X[:n//2, 2] + np.random.randn(n//2) >>> # Second segment: different sparse coefficients >>> y2 = -1*X[n//2:, 5] + 2*X[n//2:, 8] + np.random.randn(n//2) >>> y = np.concatenate([y1, y2]) >>> data = np.column_stack([y, X]) >>> result = lasso(data, alpha=0.1) >>> print(result.cp_set) """ kwargs['lasso_alpha'] = alpha kwargs['lasso_cv'] = cv return fastcpd(data, family="lasso", beta=beta, **kwargs)
# ---------------- Nonparametric costs ----------------
[docs] def rank( data: Union[np.ndarray, list], beta: float = 50.0, trim: float = 0.02, min_segment_length: Optional[int] = None, ) -> FastcpdResult: """Nonparametric rank-based change detection (distribution-free). Parameters: data: (n, d) array or list beta: numeric penalty (required) trim: boundary trim proportion min_segment_length: optional minimum segment length after post-process """ from fastcpd.pelt_nonparametric import run_rank y = np.asarray(data, dtype=np.float64) n = y.shape[0] if y.ndim > 1 else len(y) raw_cp, cp = run_rank(y, beta=float(beta), trim=trim, min_segment_length=min_segment_length) return FastcpdResult( raw_cp_set=np.asarray(raw_cp), cp_set=np.asarray(cp), cost_values=np.array([]), residuals=np.array([]), thetas=np.array([]), data=y if y.ndim == 2 else y.reshape(-1, 1), family="rank", )
[docs] def rbf( data: Union[np.ndarray, list], beta: float = 50.0, trim: float = 0.02, min_segment_length: Optional[int] = None, gamma: Optional[float] = None, feature_dim: int = 256, seed: int = 0, ) -> FastcpdResult: """Nonparametric Gaussian-kernel (RBF) change detection via RFF. Parameters: data: (n, d) array or list beta: numeric penalty (required) trim: boundary trim proportion min_segment_length: optional minimum segment length after post-process gamma: RBF bandwidth; defaults to 1/median^2 (pairwise) feature_dim: RFF feature dimension (default 256) seed: RNG seed for RFF """ from fastcpd.pelt_nonparametric import run_rbf y = np.asarray(data, dtype=np.float64) n = y.shape[0] if y.ndim > 1 else len(y) raw_cp, cp, _ = run_rbf( y, beta=float(beta), trim=trim, min_segment_length=min_segment_length, gamma=gamma, feature_dim=feature_dim, seed=seed, ) return FastcpdResult( raw_cp_set=np.asarray(raw_cp), cp_set=np.asarray(cp), cost_values=np.array([]), residuals=np.array([]), thetas=np.array([]), data=y if y.ndim == 2 else y.reshape(-1, 1), family="rbf", )