Debug pipeline

This document demonstrates how you might use a DebugPipeline. It is much like a normal scikit-learn Pipeline but it offers more debugging options.

We’ll first set up libraries and config.

[2]:
import logging

import numpy as np
import pandas as pd

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import FeatureUnion

from sklego.pipeline import DebugPipeline

logging.basicConfig(
    format=('[%(funcName)s:%(lineno)d] - %(message)s'),
    level=logging.INFO
)

Next up, let’s make a simple transformer.

[3]:
# DebugPipeline set-up
n_samples, n_features = 3, 5
X = np.zeros((n_samples, n_features))
y = np.arange(n_samples)


class Adder(TransformerMixin, BaseEstimator):
    def __init__(self, value):
        self._value = value

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X + self._value

    def __repr__(self):
        return f'Adder(value={self._value})'


steps = [
    ('add_1', Adder(value=1)),
    ('add_10', Adder(value=10)),
    ('add_100', Adder(value=100)),
    ('add_1000', Adder(value=1000)),
]

This pipeline behaves exactly the same as a normal pipeline. So let’s use it.

[4]:
pipe = DebugPipeline(steps)

pipe.fit(X, y=y)
X_out = pipe.transform(X)

print('Transformed X:\n', X_out)
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]

Log statements

It is possible to set a log_callback variable that logs in between each step.

Note: there arethreelog statements while there arefoursteps, because there arethreemomentsin betweenthe steps. The output can be checked outside of the pipeline.

[5]:
pipe = DebugPipeline(steps, log_callback='default')

pipe.fit(X, y=y)
X_out = pipe.transform(X)

print('Transformed X:\n', X_out)
[default_log_callback:34] - [Adder(value=1)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=10)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=100)] shape=(3, 5) time=0s
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]

Set the log_callback function later

It is possible to set the log_callback later then initialisation.

[6]:
pipe = DebugPipeline(steps)
pipe.log_callback = 'default'

pipe.fit(X, y=y)
X_out = pipe.transform(X)

print('Transformed X:\n', X_out)
[default_log_callback:34] - [Adder(value=1)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=10)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=100)] shape=(3, 5) time=0s
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]

Custom log_callback

The custom log callback function expect the output of each step, which is an tuple containing the output of the step and the step itself, and the execution time of the step.

[7]:
def log_callback(output, execution_time, **kwargs):
    '''My custom `log_callback` function

    Parameters
    ----------
    output : tuple(
            numpy.ndarray or pandas.DataFrame
            :class:estimator or :class:transformer
        )
        The output of the step and a step in the pipeline.
    execution_time : float
        The execution time of the step.
    '''
    logger = logging.getLogger(__name__)
    step_result, step = output
    logger.info(f'[{step}] shape={step_result.shape} '
                f'nbytes={step_result.nbytes} time={execution_time}')


pipe.log_callback = log_callback

pipe.fit(X, y=y)
X_out = pipe.transform(X)

print('Transformed X:\n', X_out)
[log_callback:16] - [Adder(value=1)] shape=(3, 5) nbytes=120 time=4.935264587402344e-05
[log_callback:16] - [Adder(value=10)] shape=(3, 5) nbytes=120 time=3.0040740966796875e-05
[log_callback:16] - [Adder(value=100)] shape=(3, 5) nbytes=120 time=2.5510787963867188e-05
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]

Feature union

Feature union also works with the debug pipeline.

[8]:
pipe_w_default_log_callback = DebugPipeline(steps, log_callback='default')
pipe_w_custom_log_callback = DebugPipeline(steps, log_callback=log_callback)

pipe_union = FeatureUnion([
    ('pipe_w_default_log_callback', pipe_w_default_log_callback),
    ('pipe_w_custom_log_callback', pipe_w_custom_log_callback),
])

pipe_union.fit(X, y=y)
X_out = pipe_union.transform(X)

print('Transformed X:\n', X_out)
[default_log_callback:34] - [Adder(value=1)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=10)] shape=(3, 5) time=0s
[default_log_callback:34] - [Adder(value=100)] shape=(3, 5) time=0s
[log_callback:16] - [Adder(value=1)] shape=(3, 5) nbytes=120 time=2.1219253540039062e-05
[log_callback:16] - [Adder(value=10)] shape=(3, 5) nbytes=120 time=7.05718994140625e-05
[log_callback:16] - [Adder(value=100)] shape=(3, 5) nbytes=120 time=2.8371810913085938e-05
Transformed X:
 [[1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111.]]

Enough logging

Remove the log_callback function when not needed anymore.

[9]:
pipe.log_callback = None

pipe.fit(X, y=y)
X_out = pipe.transform(X)

print('Transformed X:\n', X_out)
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]