Source code for sklego.mixture.bayesian_gmm_detector

import numpy as np
from scipy.optimize import minimize_scalar
from sklearn.base import BaseEstimator, OutlierMixin
from sklearn.mixture import BayesianGaussianMixture
from sklearn.utils.validation import check_is_fitted, check_array, FLOAT_DTYPES

from scipy.stats import gaussian_kde


[docs]class BayesianGMMOutlierDetector(OutlierMixin, BaseEstimator): """ The GMMDetector trains a Bayesian Gaussian Mixture Model on a dataset X. Once a density is trained we can evaluate the likelihood scores to see if it is deemed likely. By giving a threshold this model might then label outliers if their likelihood score is too low. :param threshold: the limit at which the model thinks an outlier appears, must be between (0, 1) :param method: the method that the threshold will be applied to, possible values = [stddev, default=quantile] If you select method="quantile" then the threshold value represents the quantile value to start calling something an outlier. If you select method="stddev" then the threshold value represents the numbers of standard deviations before calling something an outlier. There are other settings too, these are best described in the BayesianGaussianMixture documentation found here: https://scikit-learn.org/stable/modules/generated/sklearn.mixture.BayesianGaussianMixture.html. """ def __init__( self, threshold=0.99, method="quantile", n_components=1, covariance_type="full", tol=0.001, reg_covar=1e-06, max_iter=100, n_init=1, init_params="kmeans", weight_concentration_prior_type="dirichlet_process", weight_concentration_prior=None, mean_precision_prior=None, mean_prior=None, degrees_of_freedom_prior=None, covariance_prior=None, random_state=None, warm_start=False, verbose=0, verbose_interval=10, ): self.threshold = threshold self.method = method self.allowed_methods = ["quantile", "stddev"] self.n_components = n_components self.covariance_type = covariance_type self.tol = tol self.reg_covar = reg_covar self.max_iter = max_iter self.n_init = n_init self.init_params = init_params self.weight_concentration_prior_type = weight_concentration_prior_type self.weight_concentration_prior = weight_concentration_prior self.mean_precision_prior = mean_precision_prior self.mean_prior = mean_prior self.degrees_of_freedom_prior = degrees_of_freedom_prior self.covariance_prior = covariance_prior self.random_state = random_state self.warm_start = warm_start self.verbose = verbose self.verbose_interval = verbose_interval
[docs] def fit(self, X: np.array, y=None) -> "BayesianGMMOutlierDetector": """ Fit the model using X, y as training data. :param X: array-like, shape=(n_columns, n_samples,) training data. :param y: ignored but kept in for pipeline support :return: Returns an instance of self. """ # GMM sometimes throws an error if you don't do this X = check_array(X, estimator=self, dtype=FLOAT_DTYPES) if len(X.shape) == 1: X = np.expand_dims(X, 1) if (self.method == "quantile") and ( (self.threshold > 1) or (self.threshold < 0) ): raise ValueError( f"Threshold {self.threshold} with method {self.method} needs to be 0 < threshold < 1" ) if (self.method == "stddev") and (self.threshold < 0): raise ValueError( f"Threshold {self.threshold} with method {self.method} needs to be 0 < threshold " ) if self.method not in self.allowed_methods: raise ValueError( f"Method not recognised. Method must be in {self.allowed_methods}" ) self.gmm_ = BayesianGaussianMixture( n_components=self.n_components, covariance_type=self.covariance_type, tol=self.tol, reg_covar=self.reg_covar, max_iter=self.max_iter, n_init=self.n_init, init_params=self.init_params, weight_concentration_prior_type=self.weight_concentration_prior_type, weight_concentration_prior=self.weight_concentration_prior, mean_precision_prior=self.mean_precision_prior, mean_prior=self.mean_prior, degrees_of_freedom_prior=self.degrees_of_freedom_prior, covariance_prior=self.covariance_prior, random_state=self.random_state, warm_start=self.warm_start, verbose=self.verbose, verbose_interval=self.verbose_interval, ) self.gmm_.fit(X) score_samples = self.gmm_.score_samples(X) if self.method == "quantile": self.likelihood_threshold_ = np.quantile(score_samples, 1 - self.threshold) if self.method == "stddev": density = gaussian_kde(score_samples) max_x_value = minimize_scalar(lambda x: -density(x)).x mean_likelihood = score_samples.mean() new_likelihoods = score_samples[score_samples < max_x_value] new_likelihoods_std = np.std(new_likelihoods - mean_likelihood) self.likelihood_threshold_ = mean_likelihood - ( self.threshold * new_likelihoods_std ) return self
[docs] def score_samples(self, X): X = check_array(X, estimator=self, dtype=FLOAT_DTYPES) check_is_fitted(self, ["gmm_", "likelihood_threshold_"]) if len(X.shape) == 1: X = np.expand_dims(X, 1) return self.gmm_.score_samples(X) * -1
[docs] def decision_function(self, X): # We subtract self.offset_ to make 0 be the threshold value for being an outlier: return self.score_samples(X) + self.likelihood_threshold_
[docs] def predict(self, X): """ Predict if a point is an outlier. :param X: array-like, shape=(n_columns, n_samples, ) training data. :return: array, shape=(n_samples,) the predicted data. 1 for inliers, -1 for outliers. """ predictions = (self.decision_function(X) >= 0).astype(np.int) predictions[predictions == 1] = -1 predictions[predictions == 0] = 1 return predictions