import datetime as dt
import inspect
from functools import partial, wraps
import numpy as np
import pandas as pd
from scipy.ndimage.interpolation import shift
from sklego.common import as_list
def _get_shape_delta(old_shape, new_shape):
diffs = [
("+" if new > old else "") + str(new - old)
for new, old in zip(new_shape, old_shape)
]
return f"delta=({', '.join(diffs)})"
[docs]def log_step(
func=None,
*,
time_taken=True,
shape=True,
shape_delta=False,
names=False,
dtypes=False,
print_fn=print,
display_args=True,
log_error=True,
):
"""
Decorates a function that transforms a pandas dataframe to add automated logging statements
:param func: callable, function to log, defaults to None
:param time_taken: bool, log the time it took to run a function, defaults to True
:param shape: bool, log the shape of the output result, defaults to True
:param shape_delta: bool, log the difference in shape of input and output, defaults to False
:param names: bool, log the names of the columns of the result, defaults to False
:param dtypes: bool, log the dtypes of the results, defaults to False
:param print_fn: callable, print function (e.g. print or logger.info), defaults to print
:param print_args: bool, whether or not to print the arguments given to the function.
:param log_error: bool, whether to add the Exception message to the log if the function fails, defaults to True.
:returns: the result of the function
:Example:
>>> @log_step
... def remove_outliers(df, min_obs=5):
... pass
>>> @log_step(print_fn=logging.info, shape_delta=True)
... def remove_outliers(df, min_obs=5):
... pass
"""
if func is None:
return partial(
log_step,
time_taken=time_taken,
shape=shape,
shape_delta=shape_delta,
names=names,
dtypes=dtypes,
print_fn=print_fn,
display_args=display_args,
log_error=log_error,
)
names = False if dtypes else names
@wraps(func)
def wrapper(*args, **kwargs):
if shape_delta:
old_shape = args[0].shape
tic = dt.datetime.now()
optional_strings = []
try:
result = func(*args, **kwargs)
optional_strings = [
f"time={dt.datetime.now() - tic}" if time_taken else None,
f"n_obs={result.shape[0]}, n_col={result.shape[1]}" if shape else None,
_get_shape_delta(old_shape, result.shape) if shape_delta else None,
f"names={result.columns.to_list()}" if names else None,
f"dtypes={result.dtypes.to_dict()}" if dtypes else None,
]
return result
except Exception as exc:
optional_strings = [
f"time={dt.datetime.now() - tic}" if time_taken else None,
"FAILED" + (f" with error: {exc}" if log_error else "")
]
raise
finally:
combined = " ".join([s for s in optional_strings if s])
if display_args:
func_args = inspect.signature(func).bind(*args, **kwargs).arguments
func_args_str = "".join(
", {} = {!r}".format(*item) for item in list(func_args.items())[1:]
)
print_fn(f"[{func.__name__}(df{func_args_str})] " + combined,)
else:
print_fn(f"[{func.__name__}]" + combined,)
return wrapper
[docs]def add_lags(X, cols, lags, drop_na=True):
"""
Appends lag column(s).
:param X: array-like, shape=(n_columns, n_samples,) training data.
:param cols: column name(s) or index (indices).
:param lags: the amount of lag for each col.
:param drop_na: remove rows that contain NA values.
:returns: ``pd.DataFrame | np.ndarray`` with only the selected cols.
:Example:
>>> import pandas as pd
>>> df = pd.DataFrame([[1, 2, 3],
... [4, 5, 6],
... [7, 8, 9]],
... columns=['a', 'b', 'c'],
... index=[1, 2, 3])
>>> add_lags(df, 'a', [1]) # doctest: +NORMALIZE_WHITESPACE
a b c a1
1 1 2 3 4.0
2 4 5 6 7.0
>>> add_lags(df, ['a', 'b'], 2) # doctest: +NORMALIZE_WHITESPACE
a b c a2 b2
1 1 2 3 7.0 8.0
>>> import numpy as np
>>> X = np.array([[1, 2, 3],
... [4, 5, 6],
... [7, 8, 9]])
>>> add_lags(X, 0, [1])
array([[1, 2, 3, 4],
[4, 5, 6, 7]])
>>> add_lags(X, 1, [-1, 1])
array([[4, 5, 6, 2, 8]])
"""
# A single lag will be put in a list
lags = as_list(lags)
# Now we can iterate over the list to determine
# whether it is a list of integers
if not all(isinstance(x, int) for x in lags):
raise ValueError("lags must be a list of type: " + str(int))
# The keys of the allowed_inputs dict contain the allowed
# types, and the values contain the associated handlers
allowed_inputs = {
pd.core.frame.DataFrame: _add_lagged_pandas_columns,
np.ndarray: _add_lagged_numpy_columns,
}
# Choose the correct handler based on the input class
for allowed_input, handler in allowed_inputs.items():
if isinstance(X, allowed_input):
return handler(X, cols, lags, drop_na)
# Otherwise, raise a ValueError
allowed_input_names = list(allowed_inputs.keys())
raise ValueError("X type should be one of:", allowed_input_names)
def _add_lagged_numpy_columns(X, cols, lags, drop_na):
"""
Append a lag columns.
:param df: the input ``np.ndarray``.
:param cols: column index / indices.
:param drop_na: remove rows that contain NA values.
:returns: ``np.ndarray`` with the concatenated lagged cols.
"""
cols = as_list(cols)
if not all([isinstance(col, int) for col in cols]):
raise ValueError("Matrix columns are indexed by integers")
if not all([col < X.shape[1] for col in cols]):
raise KeyError("The column does not exist")
combos = (shift(X[:, col], -lag, cval=np.NaN) for col in cols for lag in lags)
# In integer-based ndarrays, NaN values are represented as
# -9223372036854775808, so we convert back and forth from
# original to float and back to original dtype
original_type = X.dtype
X = np.asarray(X, dtype=float)
answer = np.column_stack((X, *combos))
# Remove rows that contain NA values when drop_na is truthy
if drop_na:
answer = answer[~np.isnan(answer).any(axis=1)]
# Change dtype back to its original
answer = np.asarray(answer, dtype=original_type)
return answer
def _add_lagged_pandas_columns(df, cols, lags, drop_na):
"""
Append a lag columns.
:param df: the input ``pd.DataFrame``.
:param cols: column name(s).
:param drop_na: remove rows that contain NA values.
:returns: ``pd.DataFrame`` with the concatenated lagged cols.
"""
cols = as_list(cols)
# Indexes are not supported as pandas column names may be
# integers themselves, introducing unexpected behaviour
if not all([col in df.columns.values for col in cols]):
raise KeyError("The column does not exist")
combos = (
df[col].shift(-lag).rename(col + str(lag)) for col in cols for lag in lags
)
answer = pd.concat([df, *combos], axis=1)
# Remove rows that contain NA values when drop_na is truthy
if drop_na:
answer = answer.dropna()
return answer