Debug pipeline
This document demonstrates how you might use a DebugPipeline. It is much like a normal scikit-learn Pipeline but it offers more debugging options.
We’ll first set up libraries and config.
[2]:
import logging
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import FeatureUnion
from sklego.pipeline import DebugPipeline
logging.basicConfig(
    format=('[%(funcName)s:%(lineno)d] - %(message)s'),
    level=logging.INFO
)
Next up, let’s make a simple transformer.
[3]:
# DebugPipeline set-up
n_samples, n_features = 3, 5
X = np.zeros((n_samples, n_features))
y = np.arange(n_samples)
class Adder(TransformerMixin, BaseEstimator):
    def __init__(self, value):
        self._value = value
    def fit(self, X, y=None):
        return self
    def transform(self, X):
        return X + self._value
    def __repr__(self):
        return f'Adder(value={self._value})'
steps = [
    ('add_1', Adder(value=1)),
    ('add_10', Adder(value=10)),
    ('add_100', Adder(value=100)),
    ('add_1000', Adder(value=1000)),
]
This pipeline behaves exactly the same as a normal pipeline. So let’s use it.
[4]:
pipe = DebugPipeline(steps)
pipe.fit(X, y=y)
X_out = pipe.transform(X)
print('Transformed X:\n', X_out)
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]
Log statements
It is possible to set a log_callback variable that logs in between each step.
Note: there arethreelog statements while there arefoursteps, because there arethreemomentsin betweenthe steps. The output can be checked outside of the pipeline.
[5]:
pipe = DebugPipeline(steps, log_callback='default')
pipe.fit(X, y=y)
X_out = pipe.transform(X)
print('Transformed X:\n', X_out)
[default_log_callback:34] - [Adder(value=1)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=10)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=100)] shape=(3, 5) time=0s
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]
Set the log_callback function later
It is possible to set the log_callback later then initialisation.
[6]:
pipe = DebugPipeline(steps)
pipe.log_callback = 'default'
pipe.fit(X, y=y)
X_out = pipe.transform(X)
print('Transformed X:\n', X_out)
[default_log_callback:34] - [Adder(value=1)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=10)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=100)] shape=(3, 5) time=0s
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]
Custom log_callback
The custom log callback function expect the output of each step, which is an tuple containing the output of the step and the step itself, and the execution time of the step.
[7]:
def log_callback(output, execution_time, **kwargs):
    '''My custom `log_callback` function
    Parameters
    ----------
    output : tuple(
            numpy.ndarray or pandas.DataFrame
            :class:estimator or :class:transformer
        )
        The output of the step and a step in the pipeline.
    execution_time : float
        The execution time of the step.
    '''
    logger = logging.getLogger(__name__)
    step_result, step = output
    logger.info(f'[{step}] shape={step_result.shape} '
                f'nbytes={step_result.nbytes} time={execution_time}')
pipe.log_callback = log_callback
pipe.fit(X, y=y)
X_out = pipe.transform(X)
print('Transformed X:\n', X_out)
[log_callback:16] - [Adder(value=1)] shape=(3, 5) nbytes=120 time=4.935264587402344e-05
[log_callback:16] - [Adder(value=10)] shape=(3, 5) nbytes=120 time=3.0040740966796875e-05
[log_callback:16] - [Adder(value=100)] shape=(3, 5) nbytes=120 time=2.5510787963867188e-05
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]
Feature union
Feature union also works with the debug pipeline.
[8]:
pipe_w_default_log_callback = DebugPipeline(steps, log_callback='default')
pipe_w_custom_log_callback = DebugPipeline(steps, log_callback=log_callback)
pipe_union = FeatureUnion([
    ('pipe_w_default_log_callback', pipe_w_default_log_callback),
    ('pipe_w_custom_log_callback', pipe_w_custom_log_callback),
])
pipe_union.fit(X, y=y)
X_out = pipe_union.transform(X)
print('Transformed X:\n', X_out)
[default_log_callback:34] - [Adder(value=1)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=10)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=100)] shape=(3, 5) time=0s
[log_callback:16] - [Adder(value=1)] shape=(3, 5) nbytes=120 time=2.1219253540039062e-05
[log_callback:16] - [Adder(value=10)] shape=(3, 5) nbytes=120 time=7.05718994140625e-05
[log_callback:16] - [Adder(value=100)] shape=(3, 5) nbytes=120 time=2.8371810913085938e-05
Transformed X:
 [[1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111.]]
Enough logging
Remove the log_callback function when not needed anymore.
[9]:
pipe.log_callback = None
pipe.fit(X, y=y)
X_out = pipe.transform(X)
print('Transformed X:\n', X_out)
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]