import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_is_fitted
from sklego.common import as_list
[docs]class ColumnDropper(BaseEstimator, TransformerMixin):
    """
    Allows dropping specific columns from a pandas DataFrame by name. Can be useful in a sklearn Pipeline.
    :param columns: column name ``str`` or list of column names to be selected
    .. note::
        Raises a ``TypeError`` if input provided is not a DataFrame
        Raises a ``ValueError`` if columns provided are not in the input DataFrame
    :Example:
    >>> # Selecting a single column from a pandas DataFrame
    >>> import pandas as pd
    >>> df = pd.DataFrame({
    ...     'name': ['Swen', 'Victor', 'Alex'],
    ...     'length': [1.82, 1.85, 1.80],
    ...     'shoesize': [42, 44, 45]
    ... })
    >>> ColumnDropper(['name']).fit_transform(df)
       length  shoesize
    0    1.82        42
    1    1.85        44
    2    1.80        45
    >>> # Selecting multiple columns from a pandas DataFrame
    >>> ColumnDropper(['length', 'shoesize']).fit_transform(df)
         name
    0    Swen
    1  Victor
    2    Alex
    >>> # Selecting non-existent columns returns in a KeyError
    >>> ColumnDropper(['weight']).fit_transform(df)
    Traceback (most recent call last):
        ...
    KeyError: "['weight'] column(s) not in DataFrame"
    >>> # How to use the ColumnSelector in a sklearn Pipeline
    >>> from sklearn.pipeline import Pipeline
    >>> from sklearn.preprocessing import StandardScaler
    >>> pipe = Pipeline([
    ...     ('select', ColumnDropper(['name', 'shoesize'])),
    ...     ('scale', StandardScaler()),
    ... ])
    >>> pipe.fit_transform(df)
    array([[-0.16222142],
           [ 1.29777137],
           [-1.13554995]])
    """
    def __init__(self, columns: list):
        self.columns = columns
[docs]    def fit(self, X, y=None):
        """
        Checks 1) if input is a DataFrame, and 2) if column names are in this DataFrame
        :param X: ``pd.DataFrame`` on which we apply the column selection
        :param y: ``pd.Series`` labels for X. unused for column selection
        :returns: ``ColumnSelector`` object.
        """
        self.columns_ = as_list(self.columns)
        self._check_X_for_type(X)
        self._check_column_names(X)
        self.feature_names_ = list(X.drop(columns=self.columns_).columns)
        self._check_column_length()
        return self 
[docs]    def get_feature_names(self):
        return self.feature_names_ 
    def _check_column_length(self):
        """Check if all columns are dropped"""
        if len(self.feature_names_) == 0:
            raise ValueError(
                f"Dropping {self.columns_} would result in an empty output DataFrame"
            )
    def _check_column_names(self, X):
        """Check if one or more of the columns provided doesn't exist in the input DataFrame"""
        non_existent_columns = set(self.columns_).difference(X.columns)
        if len(non_existent_columns) > 0:
            raise KeyError(f"{list(non_existent_columns)} column(s) not in DataFrame")
    @staticmethod
    def _check_X_for_type(X):
        """Checks if input of the Selector is of the required dtype"""
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Provided variable X is not of type pandas.DataFrame") 
[docs]class PandasTypeSelector(BaseEstimator, TransformerMixin):
    """
    Select columns in a pandas dataframe based on their dtype
    :param include: types to be included in the dataframe
    :param exclude: types to be excluded in the dataframe
    """
    def __init__(self, include=None, exclude=None):
        self.include = include
        self.exclude = exclude
[docs]    def fit(self, X, y=None):
        """
        Saves the column names for check during transform
        :param X: pandas dataframe to select dtypes out of
        :param y: not used in this class
        """
        self._check_X_for_type(X)
        self.X_dtypes_ = X.dtypes
        self.feature_names_ = list(
            X.select_dtypes(include=self.include, exclude=self.exclude).columns
        )
        if len(self.feature_names_) == 0:
            raise ValueError("Provided type(s) results in empty dateframe")
        return self 
[docs]    def get_feature_names(self, *args, **kwargs):
        return self.feature_names_ 
    @staticmethod
    def _check_X_for_type(X):
        """Checks if input of the Selector is of the required dtype"""
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Provided variable X is not of type pandas.DataFrame") 
[docs]class ColumnSelector(BaseEstimator, TransformerMixin):
    """
    Allows selecting specific columns from a pandas DataFrame by name. Can be useful in a sklearn Pipeline.
    :param columns: column name ``str`` or list of column names to be selected
    .. note::
        Raises a ``TypeError`` if input provided is not a DataFrame
        Raises a ``ValueError`` if columns provided are not in the input DataFrame
    :Example:
    >>> # Selecting a single column from a pandas DataFrame
    >>> import pandas as pd
    >>> df = pd.DataFrame({
    ...     'name': ['Swen', 'Victor', 'Alex'],
    ...     'length': [1.82, 1.85, 1.80],
    ...     'shoesize': [42, 44, 45]
    ... })
    >>> ColumnSelector(['length']).fit_transform(df)
       length
    0    1.82
    1    1.85
    2    1.80
    >>> # Selecting multiple columns from a pandas DataFrame
    >>> ColumnSelector(['length', 'shoesize']).fit_transform(df)
       length  shoesize
    0    1.82        42
    1    1.85        44
    2    1.80        45
    >>> # Selecting non-existent columns returns in a KeyError
    >>> ColumnSelector(['weight']).fit_transform(df)
    Traceback (most recent call last):
        ...
    KeyError: "['weight'] column(s) not in DataFrame"
    >>> # How to use the ColumnSelector in a sklearn Pipeline
    >>> from sklearn.pipeline import Pipeline
    >>> from sklearn.preprocessing import StandardScaler
    >>> pipe = Pipeline([
    ...     ('select', ColumnSelector(['length'])),
    ...     ('scale', StandardScaler()),
    ... ])
    >>> pipe.fit_transform(df)
    array([[-0.16222142],
           [ 1.29777137],
           [-1.13554995]])
    """
    def __init__(self, columns: list):
        # if the columns parameter is not a list, make it into a list
        self.columns = columns
[docs]    def fit(self, X, y=None):
        """
        Checks 1) if input is a DataFrame, and 2) if column names are in this DataFrame
        :param X: ``pd.DataFrame`` on which we apply the column selection
        :param y: ``pd.Series`` labels for X. unused for column selection
        :returns: ``ColumnSelector`` object.
        """
        self.columns_ = as_list(self.columns)
        self._check_X_for_type(X)
        self._check_column_length()
        self._check_column_names(X)
        return self 
[docs]    def get_feature_names(self):
        return self.columns_ 
    def _check_column_length(self):
        """Check if no column is selected"""
        if len(self.columns_) == 0:
            raise ValueError(
                "Expected columns to be at least of length 1, found length of 0 instead"
            )
    def _check_column_names(self, X):
        """Check if one or more of the columns provided doesn't exist in the input DataFrame"""
        non_existent_columns = set(self.columns_).difference(X.columns)
        if len(non_existent_columns) > 0:
            raise KeyError(f"{list(non_existent_columns)} column(s) not in DataFrame")
    @staticmethod
    def _check_X_for_type(X):
        """Checks if input of the Selector is of the required dtype"""
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Provided variable X is not of type pandas.DataFrame")