Source code for sklego.preprocessing.projections

import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted

from sklego.common import as_list


[docs]class OrthogonalTransformer(BaseEstimator, TransformerMixin): """ Transform the columns of a dataframe or numpy array to a column orthogonal or orthonormal matrix. Q, R such that X = Q*R, with Q orthogonal, from which follows Q = X*inv(R) :param normalize: whether orthogonal matrix should be orthonormal as well """ def __init__(self, normalize=False): self.normalize = normalize
[docs] def fit(self, X, y=None): """ Store the inverse of R of the QR decomposition of X, which can be used to calculate the orthogonal projection of X. If normalization is required, also stores a vector with normalization terms """ X = check_array(X, estimator=self) if not X.shape[0] > 1: raise ValueError("Orthogonal transformation not valid for one sample") # Q, R such that X = Q*R, with Q orthogonal, from which follows Q = X*inv(R) Q, R = np.linalg.qr(X) self.inv_R_ = np.linalg.inv(R) if self.normalize: self.normalization_vector_ = np.linalg.norm(Q, ord=2, axis=0) else: self.normalization_vector_ = np.ones((X.shape[1],)) return self
[docs] def transform(self, X): """Transforms X using the fitted inverse of R. Normalizes the result if required""" if self.normalize: check_is_fitted(self, ["inv_R_", "normalization_vector_"]) else: check_is_fitted(self, ["inv_R_"]) X = check_array(X, estimator=self) return X @ self.inv_R_ / self.normalization_vector_
def scalar_projection(vec, unto): return vec.dot(unto) / unto.dot(unto) def vector_projection(vec, unto): return scalar_projection(vec, unto) * unto
[docs]class InformationFilter(BaseEstimator, TransformerMixin): """ The `InformationFilter` uses a variant of the gram smidt process to filter information out of the dataset. This can be useful if you want to filter information out of a dataset because of fairness. To explain how it works: given a training matrix :math:`X` that contains columns :math:`x_1, ..., x_k`. If we assume columns :math:`x_1` and :math:`x_2` to be the sensitive columns then the information-filter will remove information by applying these transformations; .. math:: \\begin{split} v_1 & = x_1 \\\\ v_2 & = x_2 - \\frac{x_2 v_1}{v_1 v_1}\\\\ v_3 & = x_3 - \\frac{x_k v_1}{v_1 v_1} - \\frac{x_2 v_2}{v_2 v_2}\\\\ ... \\\\ v_k & = x_k - \\frac{x_k v_1}{v_1 v_1} - \\frac{x_2 v_2}{v_2 v_2} \\end{split} Concatenating our vectors (but removing the sensitive ones) gives us a new training matrix :math:`X_{fair} = [v_3, ..., v_k]`. :param columns: the columns to filter out this can be a sequence of either int (in the case of numpy) or string (in the case of pandas). :param alpha: parameter to control how much to filter, for alpha=1 we filter out all information while for alpha=0 we don't apply any. """ def __init__(self, columns, alpha=1): self.columns = columns self.alpha = alpha def _check_coltype(self, X): for col in as_list(self.columns): if isinstance(col, str): if isinstance(X, np.ndarray): raise ValueError( f"column {col} is a string but datatype receive is numpy." ) if isinstance(X, pd.DataFrame): if col not in X.columns: raise ValueError(f"column {col} is not in {X.columns}") if isinstance(col, int): if col not in range(np.atleast_2d(np.array(X)).shape[1]): raise ValueError( f"column {col} is out of bounds for input shape {X.shape}" ) def _col_idx(self, X, name): if isinstance(name, str): if isinstance(X, np.ndarray): raise ValueError( "You cannot have a column of type string on a numpy input matrix." ) return {name: i for i, name in enumerate(X.columns)}[name] return name def _make_v_vectors(self, X, col_ids): vs = np.zeros((X.shape[0], len(col_ids))) for i, c in enumerate(col_ids): vs[:, i] = X[:, col_ids[i]] for j in range(0, i): vs[:, i] = vs[:, i] - vector_projection(vs[:, i], vs[:, j]) return vs
[docs] def fit(self, X, y=None): """Learn the projection required to make the dataset orthogonal to sensitive columns.""" self._check_coltype(X) self.col_ids_ = [ v if isinstance(v, int) else self._col_idx(X, v) for v in as_list(self.columns) ] X = check_array(X, estimator=self) X_fair = X.copy() v_vectors = self._make_v_vectors(X, self.col_ids_) # gram smidt process but only on sensitive attributes for i, col in enumerate(X_fair.T): for v in v_vectors.T: X_fair[:, i] = X_fair[:, i] - vector_projection(X_fair[:, i], v) # we want to learn matrix P: X P = X_fair # this means we first need to create X_fair in order to learn P self.projection_, resid, rank, s = np.linalg.lstsq(X, X_fair, rcond=None) return self
[docs] def transform(self, X): """Transforms X by applying the information filter.""" check_is_fitted(self, ["projection_", "col_ids_"]) self._check_coltype(X) X = check_array(X, estimator=self) # apply the projection and remove the column we won't need X_fair = X @ self.projection_ X_removed = np.delete(X_fair, self.col_ids_, axis=1) X_orig = np.delete(X, self.col_ids_, axis=1) return self.alpha * np.atleast_2d(X_removed) + (1 - self.alpha) * np.atleast_2d( X_orig )