# Source code for sklego.preprocessing.intervalencoder

```
try:
import cvxpy as cp
except ImportError:
from sklego.notinstalled import NotInstalledPackage
cp = NotInstalledPackage("cvxpy")
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils import check_array, check_X_y
from sklearn.utils.validation import check_is_fitted
def _mk_monotonic_average(xs, ys, intervals, method="increasing", **kwargs):
"""
Creates smoothed averages of `ys` at the intervals given by `intervals`.
:param xs: all the datapoints of a feature (represents the x-axis)
:param ys: all the datapoints what we'd like to predict (represents the y-axis)
:param intervals: the intervals at which we'd like to get a good average value
:param method: the method that is used for smoothing, can be either `increasing` or `decreasing`.
:return:
An array as long as `intervals` that represents the average `y`-values at those intervals,
keeping the constraint in mind.
"""
x_internal = np.array([xs >= i for i in intervals]).T.astype(np.float)
betas = cp.Variable(x_internal.shape[1])
objective = cp.Minimize(cp.sum_squares(x_internal * betas - ys))
if method == "increasing":
constraints = [betas[i + 1] >= 0 for i in range(betas.shape[0] - 1)]
elif method == "decreasing":
constraints = [betas[i + 1] <= 0 for i in range(betas.shape[0] - 1)]
else:
raise ValueError(
f"method must be either `increasing` or `decreasing`, got: {method}"
)
prob = cp.Problem(objective, constraints)
prob.solve()
return betas.value.cumsum()
def _mk_average(xs, ys, intervals, method="average", span=1, **kwargs):
"""
Creates smoothed averages of `ys` at the intervals given by `intervals`.
:param xs: all the datapoints of a feature (represents the x-axis)
:param ys: all the datapoints what we'd like to predict (represents the y-axis)
:param intervals: the intervals at which we'd like to get a good average value
:param method: the method that is used for smoothing, can be either `average` or `normal`.
:param span: if the method is `average` then this is the span around the interval
that is used to determine the average `y`-value, if the method is `normal` the span
becomes the value of sigma that is used for weighted averaging
:return:
An array as long as `intervals` that represents the average `y`-values at those intervals.
"""
results = np.zeros(intervals.shape)
for idx, interval in enumerate(intervals):
if method == "average":
distances = 1 / (0.01 + np.abs(xs - interval))
predicate = (xs < (interval + span)) | (xs < (interval - span))
elif method == "normal":
distances = np.exp(-((xs - interval) ** 2) / span)
predicate = xs == xs
else:
raise ValueError("method needs to be either `average` or `normal`")
subset = ys[predicate]
dist_subset = distances[predicate]
results[idx] = np.average(subset, weights=dist_subset)
return results
[docs]class IntervalEncoder(TransformerMixin, BaseEstimator):
"""
The interval encoder bends features in `X` with regards to`y`.
We take each column in X separately and smooth it towards `y` using
the strategy that is defined in `method`.
Note that this allows us to make certain features strictly monotonic
in your machine learning model if you follow this with an appropriate
model.
:param n_chunks: the number of cuts that makes the interval
:param method: the interpolation method used, must be in
["average", "normal", "increasing", "decreasing"], default: "normal"
:param span: a hyperparameter for the interpolation method, if the
method is `normal` it resembles the width of the radial basis
function used to weigh the points. It is ignored if if the method is
"increasing" or "decreasing".
"""
def __init__(self, n_chunks=10, span=1, method="normal"):
self.span = span
self.method = method
self.n_chunks = n_chunks
[docs] def fit(self, X, y):
"""Fits the estimator"""
allowed_methods = ["average", "normal", "increasing", "decreasing"]
if self.method not in allowed_methods:
raise ValueError(
f"`method` must be in {allowed_methods}, got `{self.method}`"
)
if self.n_chunks <= 0:
raise ValueError(f"`n_chunks` must be >= 1, received {self.n_chunks}")
if self.span > 1.0:
raise ValueError(
f"Error, we expect 0 <= span <= 1, received span={self.span}"
)
if self.span < 0.0:
raise ValueError(
f"Error, we expect 0 <= span <= 1, received span={self.span}"
)
# these two matrices will have shape (columns, quantiles)
# quantiles indicate where the interval split occurs
X, y = check_X_y(X, y, estimator=self)
self.quantiles_ = np.zeros((X.shape[1], self.n_chunks))
# heights indicate what heights these intervals will have
self.heights_ = np.zeros((X.shape[1], self.n_chunks))
self.num_cols_ = X.shape[1]
average_func = (
_mk_average
if self.method in ["average", "normal"]
else _mk_monotonic_average
)
for col in range(X.shape[1]):
self.quantiles_[col, :] = np.quantile(
X[:, col], q=np.linspace(0, 1, self.n_chunks)
)
self.heights_[col, :] = average_func(
X[:, col],
y,
self.quantiles_[col, :],
span=self.span,
method=self.method,
)
return self
[docs] def transform(self, X):
"""
Transform each column such that it is bends smoothly towards y.
"""
check_is_fitted(self, ["quantiles_", "heights_", "num_cols_"])
X = check_array(X, estimator=self)
if X.shape[1] != self.num_cols_:
raise ValueError(
f"fitted on {self.num_cols_} features but received {X.shape[1]}"
)
transformed = np.zeros(X.shape)
for col in range(transformed.shape[1]):
transformed[:, col] = np.interp(
X[:, col], self.quantiles_[col, :], self.heights_[col, :]
)
return transformed
```