Pipeline

Pipelines, variances to the sklearn.pipeline.Pipeline object.

class sklego.pipeline.DebugPipeline(steps, memory=None, verbose=False, *, log_callback=None)[source]

Bases: sklearn.pipeline.Pipeline

A pipeline that has a log statement in between each step, useful for debugging.

Variables

log_callback

optional. The callback function that logs information in between each intermediate step. Defaults to None. If set to ‘default’, default_log_callback() is used.

See default_log_callback() for an example.

See sklearn.pipeline.Pipeline for all other variables.

Note

This implementation is a hack on the original sklearn Pipeline. It aims to have the same behaviour as the original sklearn Pipeline, while changing minimal amount of code.

The log statement is added by overwriting the cache method of the memory, such that the function called in the cache is wrapped with a functions that calls the log callback function (log_callback).

This hack breaks will break when:
  • The SKlearn pipeline initialization function is changed.

  • The memory is used differently in the fit.

  • The joblib.memory.Memory changes behaviour of the cache() method..

  • The joblib.memory.Memory starts using a _cache() method.

Example

>>> # Set-up for example
>>> import logging
>>> import sys
>>>
>>> import numpy as np
>>> import pandas as pd
>>>
>>> from sklearn.base import BaseEstimator, TransformerMixin
>>> from sklego.pipeline import DebugPipeline
>>>
>>> logging.basicConfig(
...    format=('[%(funcName)s:%(lineno)d] - %(message)s'),
...    level=logging.INFO,
...    stream=sys.stdout,
... )
>>>
>>> # DebugPipeline set-up
>>> n_samples, n_features = 3, 5
>>> X = np.zeros((n_samples, n_features))
>>> y = np.arange(n_samples)
>>>
>>> class Adder(TransformerMixin, BaseEstimator):
...    def __init__(self, value):
...        self._value = value
...
...    def fit(self, X, y=None):
...        return self
...
...    def transform(self, X):
...        return X + self._value
...
...    def __repr__(self):
...        return f'Adder(value={self._value})'
>>>
>>> steps = [
...     ('add_1', Adder(value=1)),
...     ('add_10', Adder(value=10)),
...     ('add_100', Adder(value=100)),
...     ('add_1000', Adder(value=1000)),
... ]
>>>
>>> # The DebugPipeline behaves the sames as the Sklearn pipeline.
>>> pipe = DebugPipeline(steps)
>>>
>>> _ = pipe.fit(X, y=y)
>>> print(pipe.transform(X))
[[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]
>>>
>>> # But it has the option to set a `log_callback`, that logs in between
>>> # each step.
>>> pipe = DebugPipeline(steps, log_callback='default')
>>>
>>> _ = pipe.fit(X, y=y)
[default_log_callback:34] - [Adder(value=1)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=10)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=100)] shape=(3, 5) time=0s
>>> print(pipe.transform(X))
[[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]
>>>
>>> # It is possible to set the `log_callback` later then initialisation.
>>> pipe = DebugPipeline(steps)
>>> pipe.log_callback = 'default'
>>>
>>> _ = pipe.fit(X, y=y)
[default_log_callback:34] - [Adder(value=1)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=10)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=100)] shape=(3, 5) time=0s
>>> print(pipe.transform(X))
[[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]
>>>
>>> # It is possible to define your own `log_callback`
>>> def log_callback(output, execution_time, **kwargs):
...     """My custom `log_callback` function
...
...     Parameters
...     ----------
...     output : tuple(
...             numpy.ndarray or pandas.DataFrame
...             :class:estimator or :class:transformer
...         )
...         The output of the step and a step in the pipeline.
...     execution_time : float
...         The execution time of the step.  ...
...     Note
...     ----------
...     The **kwargs are for arguments that are not used in this callback.
...     """
...     logger = logging.getLogger(__name__)
...     step_result, step = output
...     logger.info(
...         f'[{step}] shape={step_result.shape} '
...         f'nbytes={step_result.nbytes} time={int(execution_time)}s')
>>>
>>> pipe.log_callback = log_callback
>>>
>>> _ = pipe.fit(X, y=y)
[log_callback:20] - [Adder(value=1)] shape=(3, 5) nbytes=120 time=0s
[log_callback:20] - [Adder(value=10)] shape=(3, 5) nbytes=120 time=0s
[log_callback:20] - [Adder(value=100)] shape=(3, 5) nbytes=120 time=0s
>>> print(pipe.transform(X))
[[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]
>>>
>>> # Remove the `log_callback` when you want to stop logging.
>>> pipe.log_callback = None
>>>
>>> _ = pipe.fit(X, y=y)
>>> print(pipe.transform(X))
[[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]
>>>
>>> # Logging also works with FeatureUnion
>>> from sklearn.pipeline import FeatureUnion
>>> pipe_w_default_log_callback = DebugPipeline(steps, log_callback='default')
>>> pipe_w_custom_log_callback = DebugPipeline(steps, log_callback=log_callback)
>>>
>>> pipe_union = DebugPipeline([
...     ('feature_union', FeatureUnion([
...         ('pipe_w_default_log_callback', pipe_w_default_log_callback),
...         ('pipe_w_custom_log_callback', pipe_w_custom_log_callback),
...     ])),
...     ('final_adder', Adder(10000))
... ], log_callback='default')
>>>
>>> _ = pipe_union.fit(X, y=y)   
[default_log_callback:34] - [Adder(value=1)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=10)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=100)] shape=(3, 5) time=0s
[log_callback:20] - [Adder(value=1)] shape=(3, 5) nbytes=120 time=0s
[log_callback:20] - [Adder(value=10)] shape=(3, 5) nbytes=120 time=0s
[log_callback:20] - [Adder(value=100)] shape=(3, 5) nbytes=120 time=0s
[default_log_callback:34] - [FeatureUnion(...)] shape=(3, 10) time=0s
>>> print(pipe_union.transform(X))
[[11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111.]
 [11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111.]
 [11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111. 11111.]]
property log_callback
property memory
steps: List[Any]
sklego.pipeline.default_log_callback(output, execution_time, **kwargs)[source]

The default log callback which logs the step name, shape of the output and the execution time of the step.

Parameters
  • output (tuple) – (numpy.ndarray or pandas.DataFrame, estimator or transformer) The output of the step and a step in the pipeline.

  • execution_time (float) – The execution time of the step.

Note

If you write your custom callback function the input is:
param function func

The function to be wrapped.

param tuple input_args

The input arguments.

param dict input_kwargs

The input key-word arguments.

param output

The output of the function.

param execution_time float

The execution time.

sklego.pipeline.make_debug_pipeline(*steps, **kwargs)[source]

Construct a DebugPipeline from the given estimators. This is a shorthand for the DebugPipeline constructor; it does not require, and does not permit, naming the estimators. Instead, their names will be set to the lowercase of their types automatically. Parameters ———- *steps : list of estimators. memory : None, str or object with the joblib.Memory interface, optional

Used to cache the fitted transformers of the pipeline. By default, no caching is performed. If a string is given, it is the path to the caching directory. Enabling caching triggers a clone of the transformers before fitting. Therefore, the transformer instance given to the pipeline cannot be inspected directly. Use the attribute named_steps or steps to inspect estimators within the pipeline. Caching the transformers is advantageous when fitting is time consuming.

verboseboolean, default=False

If True, the time elapsed while fitting each step will be printed as it is completed.

log_callback: string, default=None.

The callback function that logs information in between each intermediate step. Defaults to None. If set to ‘default’, default_log_callback() is used.

See default_log_callback() for an example.

sklego.pipeline.DebugPipelineClass for creating a pipeline of

transforms with a final estimator.

>>> from sklearn.naive_bayes import GaussianNB
>>> from sklearn.preprocessing import StandardScaler
>>> make_debug_pipeline(StandardScaler(), GaussianNB(priors=None))
DebugPipeline(steps=[('standardscaler', StandardScaler()),
                ('gaussiannb', GaussianNB())])
Returns
-------
p : DebugPipeline