Source code for sklego.common

import collections
import hashlib

import numpy as np
import pandas as pd
from sklearn.base import TransformerMixin
from sklearn.utils.validation import check_array, check_is_fitted, check_X_y


[docs]class TrainOnlyTransformerMixin(TransformerMixin): """ Allows using a separate function for transforming train and test data Usage: >>> from sklearn.base import BaseEstimator >>> class TrainOnlyTransformer(TrainOnlyTransformerMixin, BaseEstimator): ... def fit(self, X, y): ... super().fit(X, y) ... ... def transform_train(self, X, y=None): ... return X + np.random.normal(0, 1, size=X.shape) ... >>> X_train, X_test = np.random.randn(100, 4), np.random.randn(100, 4) >>> y_train, y_test = np.random.randn(100), np.random.randn(100) >>> >>> trf = TrainOnlyTransformer() >>> trf.fit(X_train, y_train) >>> >>> assert np.all(trf.transform(X_train) != X_train) >>> assert np.all(trf.transform(X_test) == X_test) .. warning:: Transformers using this class as a mixin should at a minimum: - call `super().fit` in their fit method - implement `transform_train()` They may also implement `transform_test()`. If it is not implemented, `transform_test` will simply return the untransformed dataframe """ _HASHERS = { pd.DataFrame: lambda X: hashlib.sha256( pd.util.hash_pandas_object(X, index=True).values ).hexdigest(), np.ndarray: lambda X: hash(X.data.tobytes()), np.memmap: lambda X: hash(X.data.tobytes()), }
[docs] def fit(self, X, y=None): """Calculates the hash of X_train""" if y is None: check_array(X, estimator=self) else: check_X_y(X, y, estimator=self) self.X_hash_ = self._hash(X) self.dim_ = X.shape[1] return self
@staticmethod def _hash(X): """Returns a hash of X based on the type of X. Hashers are defined in TrainOnlyMixin.HASHERS""" try: hasher = TrainOnlyTransformerMixin._HASHERS[type(X)] except KeyError: raise ValueError( f"Unknown datatype {type(X)}, " f"TransformerSelector only supports {TrainOnlyTransformerMixin.HASHERS.keys()}" ) else: return hasher(X)
[docs] def transform(self, X, y=None): """ Dispatcher for transform method. It will dispatch to `self.transform_train` if X is the same as X passed to `fit`, otherwise, it will dispatch to `self.trainsform_test` """ check_is_fitted(self, ["X_hash_", "dim_"]) check_array(X, estimator=self) if X.shape[1] != self.dim_: raise ValueError( f"Unexpected input dimension {X.shape[1]}, expected {self.dim_}" ) if self._hash(X) == self.X_hash_: return self.transform_train(X) else: return self.transform_test(X)
[docs] def transform_train(self, X, y=None): raise NotImplementedError( "Subclasses of TrainOnlyMixin should implement `transform_train`" )
[docs] def transform_test(self, X, y=None): return X
[docs]def as_list(val): """ Helper function, always returns a list of the input value. :param val: the input value. :returns: the input value as a list. :Example: >>> as_list('test') ['test'] >>> as_list(['test1', 'test2']) ['test1', 'test2'] """ treat_single_value = str if isinstance(val, treat_single_value): return [val] if hasattr(val, "__iter__"): return list(val) return [val]
[docs]def flatten(nested_iterable): """ Helper function, returns an iterator of flattened values from an arbitrarily nested iterable >>> list(flatten([['test1', 'test2'], ['a', 'b', ['c', 'd']]])) ['test1', 'test2', 'a', 'b', 'c', 'd'] >>> list(flatten(['test1', ['test2']])) ['test1', 'test2'] """ for el in nested_iterable: if isinstance(el, collections.abc.Iterable) and not isinstance( el, (str, bytes) ): yield from flatten(el) else: yield el
[docs]def expanding_list(list_to_extent, return_type=list): """ Make a expanding list of lists by making tuples of the first element, the first 2 elements etc. :param list_to_extent: :param return_type: type of the elements of the list (tuple or list) :Example: >>> expanding_list('test') [['test']] >>> expanding_list(['test1', 'test2', 'test3']) [['test1'], ['test1', 'test2'], ['test1', 'test2', 'test3']] >>> expanding_list(['test1', 'test2', 'test3'], tuple) [('test1',), ('test1', 'test2'), ('test1', 'test2', 'test3')] """ listed = as_list(list_to_extent) if len(listed) <= 1: return [listed] return [return_type(listed[: n + 1]) for n in range(len(listed))]
[docs]def sliding_window(sequence, window_size, step_size): """Returns sliding window generator object from a sequence :param sequence: e.g. a list :type sequence: Iterable :param window_size: the size of each window :type window_size: int :param step_size: the amount of steps to the next window :type step_size: int :return: a sliding window generator object :rtype: Generator :Example: >>> generator = sliding_window([1,2,4,5], 2, 1) >>> [i for i in generator] [[1,2], [2,4], [4,5], [5]] """ return ( sequence[pos : pos + window_size] for pos in range(0, len(sequence), step_size) )