Pipeline
Pipelines, variances to the sklearn.pipeline.Pipeline
object.
- class sklego.pipeline.DebugPipeline(steps, memory=None, verbose=False, *, log_callback=None)[source]
Bases:
sklearn.pipeline.Pipeline
A pipeline that has a log statement in between each step, useful for debugging.
- Variables
log_callback –
optional. The callback function that logs information in between each intermediate step. Defaults to None. If set to ‘default’,
default_log_callback()
is used.See
default_log_callback()
for an example.
See
sklearn.pipeline.Pipeline
for all other variables.Note
This implementation is a hack on the original sklearn Pipeline. It aims to have the same behaviour as the original sklearn Pipeline, while changing minimal amount of code.
The log statement is added by overwriting the cache method of the memory, such that the function called in the cache is wrapped with a functions that calls the log callback function (log_callback).
- This hack breaks will break when:
The SKlearn pipeline initialization function is changed.
The memory is used differently in the fit.
The
joblib.memory.Memory
changes behaviour of thecache()
method..The
joblib.memory.Memory
starts using a_cache()
method.
- Example
>>> # Set-up for example >>> import logging >>> import sys >>> >>> import numpy as np >>> import pandas as pd >>> >>> from sklearn.base import BaseEstimator, TransformerMixin >>> from sklego.pipeline import DebugPipeline >>> >>> logging.basicConfig( ... format=('[%(funcName)s:%(lineno)d] - %(message)s'), ... level=logging.INFO, ... stream=sys.stdout, ... ) >>> >>> # DebugPipeline set-up >>> n_samples, n_features = 3, 5 >>> X = np.zeros((n_samples, n_features)) >>> y = np.arange(n_samples) >>> >>> class Adder(TransformerMixin, BaseEstimator): ... def __init__(self, value): ... self._value = value ... ... def fit(self, X, y=None): ... return self ... ... def transform(self, X): ... return X + self._value ... ... def __repr__(self): ... return f'Adder(value={self._value})' >>> >>> steps = [ ... ('add_1', Adder(value=1)), ... ('add_10', Adder(value=10)), ... ('add_100', Adder(value=100)), ... ('add_1000', Adder(value=1000)), ... ] >>> >>> # The DebugPipeline behaves the sames as the Sklearn pipeline. >>> pipe = DebugPipeline(steps) >>> >>> _ = pipe.fit(X, y=y) >>> print(pipe.transform(X)) [[1111. 1111. 1111. 1111. 1111.] [1111. 1111. 1111. 1111. 1111.] [1111. 1111. 1111. 1111. 1111.]] >>> >>> # But it has the option to set a `log_callback`, that logs in between >>> # each step. >>> pipe = DebugPipeline(steps, log_callback='default') >>> >>> _ = pipe.fit(X, y=y) [default_log_callback:34] - [Adder(value=1)] shape=(3, 5) time=0s [default_log_callback:34] - [Adder(value=10)] shape=(3, 5) time=0s [default_log_callback:34] - [Adder(value=100)] shape=(3, 5) time=0s >>> print(pipe.transform(X)) [[1111. 1111. 1111. 1111. 1111.] [1111. 1111. 1111. 1111. 1111.] [1111. 1111. 1111. 1111. 1111.]] >>> >>> # It is possible to set the `log_callback` later then initialisation. >>> pipe = DebugPipeline(steps) >>> pipe.log_callback = 'default' >>> >>> _ = pipe.fit(X, y=y) [default_log_callback:34] - [Adder(value=1)] shape=(3, 5) time=0s [default_log_callback:34] - [Adder(value=10)] shape=(3, 5) time=0s [default_log_callback:34] - [Adder(value=100)] shape=(3, 5) time=0s >>> print(pipe.transform(X)) [[1111. 1111. 1111. 1111. 1111.] [1111. 1111. 1111. 1111. 1111.] [1111. 1111. 1111. 1111. 1111.]] >>> >>> # It is possible to define your own `log_callback` >>> def log_callback(output, execution_time, **kwargs): ... """My custom `log_callback` function ... ... Parameters ... ---------- ... output : tuple( ... numpy.ndarray or pandas.DataFrame ... :class:estimator or :class:transformer ... ) ... The output of the step and a step in the pipeline. ... execution_time : float ... The execution time of the step. ... ... Note ... ---------- ... The **kwargs are for arguments that are not used in this callback. ... """ ... logger = logging.getLogger(__name__) ... step_result, step = output ... logger.info( ... f'[{step}] shape={step_result.shape} ' ... f'nbytes={step_result.nbytes} time={int(execution_time)}s') >>> >>> pipe.log_callback = log_callback >>> >>> _ = pipe.fit(X, y=y) [log_callback:20] - [Adder(value=1)] shape=(3, 5) nbytes=120 time=0s [log_callback:20] - [Adder(value=10)] shape=(3, 5) nbytes=120 time=0s [log_callback:20] - [Adder(value=100)] shape=(3, 5) nbytes=120 time=0s >>> print(pipe.transform(X)) [[1111. 1111. 1111. 1111. 1111.] [1111. 1111. 1111. 1111. 1111.] [1111. 1111. 1111. 1111. 1111.]] >>> >>> # Remove the `log_callback` when you want to stop logging. >>> pipe.log_callback = None >>> >>> _ = pipe.fit(X, y=y) >>> print(pipe.transform(X)) [[1111. 1111. 1111. 1111. 1111.] [1111. 1111. 1111. 1111. 1111.] [1111. 1111. 1111. 1111. 1111.]] >>> >>> # Logging also works with FeatureUnion >>> from sklearn.pipeline import FeatureUnion >>> pipe_w_default_log_callback = DebugPipeline(steps, log_callback='default') >>> pipe_w_custom_log_callback = DebugPipeline(steps, log_callback=log_callback) >>> >>> pipe_union = DebugPipeline([ ... ('feature_union', FeatureUnion([ ... ('pipe_w_default_log_callback', pipe_w_default_log_callback), ... ('pipe_w_custom_log_callback', pipe_w_custom_log_callback), ... ])), ... ('final_adder', Adder(10000)) ... ], log_callback='default') >>> >>> _ = pipe_union.fit(X, y=y) [default_log_callback:34] - [Adder(value=1)] shape=(3, 5) time=0s [default_log_callback:34] - [Adder(value=10)] shape=(3, 5) time=0s [default_log_callback:34] - [Adder(value=100)] shape=(3, 5) time=0s [log_callback:20] - [Adder(value=1)] shape=(3, 5) nbytes=120 time=0s [log_callback:20] - [Adder(value=10)] shape=(3, 5) nbytes=120 time=0s [log_callback:20] - [Adder(value=100)] shape=(3, 5) nbytes=120 time=0s [default_log_callback:34] - [FeatureUnion(...)] shape=(3, 10) time=0s >>> print(pipe_union.transform(X)) [[11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111.] [11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111.] [11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111.]]
- property log_callback
- property memory
- steps: List[Any]
- sklego.pipeline.default_log_callback(output, execution_time, **kwargs)[source]
The default log callback which logs the step name, shape of the output and the execution time of the step.
- Parameters
output (tuple) – (
numpy.ndarray
orpandas.DataFrame
,estimator
ortransformer
) The output of the step and a step in the pipeline.execution_time (float) – The execution time of the step.
Note
- If you write your custom callback function the input is:
- param function func
The function to be wrapped.
- param tuple input_args
The input arguments.
- param dict input_kwargs
The input key-word arguments.
- param output
The output of the function.
- param execution_time float
The execution time.
- sklego.pipeline.make_debug_pipeline(*steps, **kwargs)[source]
Construct a DebugPipeline from the given estimators. This is a shorthand for the DebugPipeline constructor; it does not require, and does not permit, naming the estimators. Instead, their names will be set to the lowercase of their types automatically. Parameters ———- *steps : list of estimators. memory : None, str or object with the joblib.Memory interface, optional
Used to cache the fitted transformers of the pipeline. By default, no caching is performed. If a string is given, it is the path to the caching directory. Enabling caching triggers a clone of the transformers before fitting. Therefore, the transformer instance given to the pipeline cannot be inspected directly. Use the attribute
named_steps
orsteps
to inspect estimators within the pipeline. Caching the transformers is advantageous when fitting is time consuming.- verboseboolean, default=False
If True, the time elapsed while fitting each step will be printed as it is completed.
- log_callback: string, default=None.
The callback function that logs information in between each intermediate step. Defaults to None. If set to ‘default’,
default_log_callback()
is used.See
default_log_callback()
for an example.
- sklego.pipeline.DebugPipelineClass for creating a pipeline of
transforms with a final estimator.
>>> from sklearn.naive_bayes import GaussianNB >>> from sklearn.preprocessing import StandardScaler >>> make_debug_pipeline(StandardScaler(), GaussianNB(priors=None)) DebugPipeline(steps=[('standardscaler', StandardScaler()), ('gaussiannb', GaussianNB())]) Returns ------- p : DebugPipeline