import numpy as np
import umap
from sklearn.base import BaseEstimator, OutlierMixin
from sklearn.utils.validation import check_is_fitted, check_array, FLOAT_DTYPES
[docs]class UMAPOutlierDetection(BaseEstimator, OutlierMixin):
"""
Does outlier detection based on the reconstruction error from UMAP.
"""
def __init__(
self,
n_components=2,
threshold=None,
variant="relative",
n_neighbors=15,
min_dist=0.1,
metric="euclidean",
random_state=None,
):
self.n_components = n_components
self.threshold = threshold
self.variant = variant
self.n_neighbors = n_neighbors
self.min_dist = min_dist
self.metric = metric
self.random_state = random_state
[docs] def fit(self, X, y=None):
"""
Fit the model using X as training data.
:param X: array-like, shape=(n_columns, n_samples,) training data.
:param y: ignored but kept in for pipeline support
:return: Returns an instance of self.
"""
X = check_array(X, estimator=self, dtype=FLOAT_DTYPES)
if self.n_components < 2:
raise ValueError("Number of components must be at least two.")
if not self.threshold:
raise ValueError("The `threshold` value cannot be `None`.")
self.umap_ = umap.UMAP(
n_components=self.n_components,
n_neighbors=self.n_neighbors,
min_dist=self.min_dist,
metric=self.metric,
random_state=self.random_state,
)
self.umap_.fit(X, y)
self.offset_ = -self.threshold
return self
[docs] def difference(self, X):
"""
Shows the calculated difference between original and reconstructed data. Row by row.
:param X: array-like, shape=(n_columns, n_samples, ) training data.
:return: array, shape=(n_samples,) the difference
"""
check_is_fitted(self, ["umap_", "offset_"])
reduced = self.umap_.transform(X)
diff = np.sum(np.abs(self.umap_.inverse_transform(reduced) - X), axis=1)
if self.variant == "relative":
diff = diff / X.sum(axis=1)
return diff
[docs] def predict(self, X):
"""
Predict if a point is an outlier.
:param X: array-like, shape=(n_columns, n_samples, ) training data.
:return: array, shape=(n_samples,) the predicted data. 1 for inliers, -1 for outliers.
"""
X = check_array(X, estimator=self, dtype=FLOAT_DTYPES)
check_is_fitted(self, ["umap_", "offset_"])
result = np.ones(X.shape[0])
result[self.difference(X) > self.threshold] = -1
return result.astype(np.int)