Pandas pipelines

Method chaining is a great way for writing pandas code as it allows us to go from:

raw_data = pd.read_parquet(...)
data_with_types = set_dtypes(raw_data)
data_without_outliers = remove_outliers(data_with_types)

to

data = (
    pd.read_parquet(...)
    .pipe(set_dtypes)
    .pipe(remove_outliers)
)

But it does come at a cost, mostly in our ability to debug long pipelines. If there’s a mistake somewhere along the way, you can only inspect the end result and lose the ability to inspect intermediate results. A mitigation for this is to add decorators to your pipeline functions that log common attributes of your dataframe on each step:

Logging in method chaining

In order to use the logging capabilitites we first need to ensure we have a proper logger configured. We do this by running logging.basicConfig(level=logging.DEBUG).

[1]:
from sklego.datasets import load_chicken
from sklego.pandas_utils import log_step
chickweight = load_chicken(as_frame=True)
[2]:
import logging

logging.basicConfig(level=logging.DEBUG)

If we now add a log_step decorator to our pipeline function and execute the function, we see that we get some logging statements for free

[10]:
@log_step
def set_dtypes(chickweight):
    return chickweight.assign(
        diet=lambda d: d['diet'].astype('category'),
        chick=lambda d: d['chick'].astype('category'),
    )
[11]:
chickweight.pipe(set_dtypes).head()
[set_dtypes(df)] time=0:00:00.001935 n_obs=578, n_col=4
[11]:
weight time chick diet
0 42 0 1 1
1 51 2 1 1
2 59 4 1 1
3 64 6 1 1
4 76 8 1 1

We can choose to log at different log levels by changing the print_fn of the log_step decorator. For example if we have a remove_outliers function that calls different outlier removal functions for different types of outliers, we might in general be only interested in the total outliers removed. In order to get that, we set the log level for our specific implementations to logging.debug

[12]:
@log_step(print_fn=logging.debug)
def remove_dead_chickens(chickweight):
    dead_chickens = chickweight.groupby('chick').size().loc[lambda s: s < 12]
    return chickweight.loc[lambda d: ~d['chick'].isin(dead_chickens)]


@log_step(print_fn=logging.info)
def remove_outliers(chickweight):
    return chickweight.pipe(remove_dead_chickens)
[13]:
chickweight.pipe(set_dtypes).pipe(remove_outliers).head()
DEBUG:root:[remove_dead_chickens(df)] time=0:00:00.002242 n_obs=519, n_col=4
INFO:root:[remove_outliers(df)] time=0:00:00.003335 n_obs=519, n_col=4
[set_dtypes(df)] time=0:00:00.001687 n_obs=578, n_col=4
[13]:
weight time chick diet
0 42 0 1 1
1 51 2 1 1
2 59 4 1 1
3 64 6 1 1
4 76 8 1 1

The log step function has some settings that let you tweak what exactly to log:

  • time_taken: log the time it took to execute the function (default True)

  • shape: log the output shape of the function (default True)

  • shape_delta: log the difference in shape between input and output (default False)

  • names: log the column names if the output (default False)

  • dtypes: log the dtypes of the columns of the output (default False)

For example, if we don’t care how long a function takes, but do want to see how many rows are removed if we remove dead chickens:

[14]:
@log_step(time_taken=False, shape=False, shape_delta=True)
def remove_dead_chickens(chickweight):
    dead_chickens = chickweight.groupby('chick').size().loc[lambda s: s < 12]
    return chickweight.loc[lambda d: ~d['chick'].isin(dead_chickens)]

chickweight.pipe(remove_dead_chickens).head()
[remove_dead_chickens(df)] delta=(-59, 0)
[14]:
weight time chick diet
0 42 0 1 1
1 51 2 1 1
2 59 4 1 1
3 64 6 1 1
4 76 8 1 1

We can also define custom logging functions by using log_step_extra. This takes any number of functions (> 1) that can take the output dataframe and return some output that can be converted to a string. For example, if we want to log some arbitrary message and the number of unique chicks in our dataset, we can do:

[15]:
from sklego.pandas_utils import log_step_extra

def count_unique_chicks(df, **kwargs):
    return "nchicks=" + str(df["chick"].nunique())

def display_message(df, msg):
    return msg


@log_step_extra(count_unique_chicks)
def start_pipe(df):
    """Get initial chick count"""
    return df


@log_step_extra(count_unique_chicks, display_message, msg="without diet 1")
def remove_diet_1_chicks(df):
    return df.loc[df["diet"] != 1]

(chickweight
 .pipe(start_pipe)
 .pipe(remove_diet_1_chicks)
 .head())
[start_pipe(df)] nchicks=50
[remove_diet_1_chicks(df)] nchicks=30 without diet 1
[15]:
weight time chick diet
220 40 0 21 2
221 50 2 21 2
222 62 4 21 2
223 86 6 21 2
224 125 8 21 2