{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Pandas pipelines\n", "\n", "[Method chaining](https://tomaugspurger.github.io/method-chaining) is a great way for writing pandas code as it allows us to go from:\n", "\n", "```python\n", "raw_data = pd.read_parquet(...)\n", "data_with_types = set_dtypes(raw_data)\n", "data_without_outliers = remove_outliers(data_with_types)\n", "```\n", "\n", "to \n", "\n", "```python\n", "data = (\n", " pd.read_parquet(...)\n", " .pipe(set_dtypes)\n", " .pipe(remove_outliers)\n", ")\n", "```\n", "\n", "But it does come at a cost, mostly in our ability to debug long pipelines. If there's a mistake somewhere along the way, you can only inspect the end result and lose the ability to inspect intermediate results. A mitigation for this is to add decorators to your pipeline functions that log common attributes of your dataframe on each step:\n", "\n", "\n", "## Logging in method chaining\n", "In order to use the logging capabilitites we first need to ensure we have a proper logger configured. We do this by running `logging.basicConfig(level=logging.DEBUG)`. \n", "\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from sklego.datasets import load_chicken\n", "from sklego.pandas_utils import log_step\n", "chickweight = load_chicken(as_frame=True)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import logging\n", "\n", "logging.basicConfig(level=logging.DEBUG)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If we now add a `log_step` decorator to our pipeline function and execute the function, we see that we get some logging statements for free" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "@log_step\n", "def set_dtypes(chickweight):\n", " return chickweight.assign(\n", " diet=lambda d: d['diet'].astype('category'),\n", " chick=lambda d: d['chick'].astype('category'),\n", " )" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[set_dtypes(df)] time=0:00:00.001935 n_obs=578, n_col=4\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
weighttimechickdiet
042011
151211
259411
364611
476811
\n", "
" ], "text/plain": [ " weight time chick diet\n", "0 42 0 1 1\n", "1 51 2 1 1\n", "2 59 4 1 1\n", "3 64 6 1 1\n", "4 76 8 1 1" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "chickweight.pipe(set_dtypes).head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can choose to log at different log levels by changing the `print_fn` of the `log_step` decorator. For example if we have a `remove_outliers` function that calls different outlier removal functions for different types of outliers, we might in general be only interested in the total outliers removed. In order to get that, we set the log level for our specific implementations to `logging.debug`" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "@log_step(print_fn=logging.debug)\n", "def remove_dead_chickens(chickweight):\n", " dead_chickens = chickweight.groupby('chick').size().loc[lambda s: s < 12]\n", " return chickweight.loc[lambda d: ~d['chick'].isin(dead_chickens)]\n", "\n", "\n", "@log_step(print_fn=logging.info)\n", "def remove_outliers(chickweight):\n", " return chickweight.pipe(remove_dead_chickens)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "scrolled": true, "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "DEBUG:root:[remove_dead_chickens(df)] time=0:00:00.002242 n_obs=519, n_col=4\n", "INFO:root:[remove_outliers(df)] time=0:00:00.003335 n_obs=519, n_col=4\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "[set_dtypes(df)] time=0:00:00.001687 n_obs=578, n_col=4\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
weighttimechickdiet
042011
151211
259411
364611
476811
\n", "
" ], "text/plain": [ " weight time chick diet\n", "0 42 0 1 1\n", "1 51 2 1 1\n", "2 59 4 1 1\n", "3 64 6 1 1\n", "4 76 8 1 1" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "chickweight.pipe(set_dtypes).pipe(remove_outliers).head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The log step function has some settings that let you tweak what exactly to log:\n", "\n", "- `time_taken`: log the time it took to execute the function (default True)\n", "- `shape`: log the output shape of the function (default True)\n", "- `shape_delta`: log the difference in shape between input and output (default False)\n", "- `names`: log the column names if the output (default False)\n", "- `dtypes`: log the dtypes of the columns of the output (default False)\n", "\n", "For example, if we don't care how long a function takes, but do want to see how many rows are removed if we remove dead chickens:\n" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[remove_dead_chickens(df)] delta=(-59, 0)\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
weighttimechickdiet
042011
151211
259411
364611
476811
\n", "
" ], "text/plain": [ " weight time chick diet\n", "0 42 0 1 1\n", "1 51 2 1 1\n", "2 59 4 1 1\n", "3 64 6 1 1\n", "4 76 8 1 1" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "@log_step(time_taken=False, shape=False, shape_delta=True)\n", "def remove_dead_chickens(chickweight):\n", " dead_chickens = chickweight.groupby('chick').size().loc[lambda s: s < 12]\n", " return chickweight.loc[lambda d: ~d['chick'].isin(dead_chickens)]\n", "\n", "chickweight.pipe(remove_dead_chickens).head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also define custom logging functions by using `log_step_extra`. This takes any number of functions (> 1) that can take the output dataframe and return some output that can be converted to a string. For example, if we want to log some arbitrary message and the number of unique chicks in our dataset, we can do:" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[start_pipe(df)] nchicks=50\n", "[remove_diet_1_chicks(df)] nchicks=30 without diet 1\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
weighttimechickdiet
220400212
221502212
222624212
223866212
2241258212
\n", "
" ], "text/plain": [ " weight time chick diet\n", "220 40 0 21 2\n", "221 50 2 21 2\n", "222 62 4 21 2\n", "223 86 6 21 2\n", "224 125 8 21 2" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from sklego.pandas_utils import log_step_extra\n", "\n", "def count_unique_chicks(df, **kwargs):\n", " return \"nchicks=\" + str(df[\"chick\"].nunique())\n", "\n", "def display_message(df, msg):\n", " return msg\n", "\n", "\n", "@log_step_extra(count_unique_chicks)\n", "def start_pipe(df):\n", " \"\"\"Get initial chick count\"\"\"\n", " return df\n", "\n", "\n", "@log_step_extra(count_unique_chicks, display_message, msg=\"without diet 1\")\n", "def remove_diet_1_chicks(df):\n", " return df.loc[df[\"diet\"] != 1]\n", "\n", "(chickweight\n", " .pipe(start_pipe)\n", " .pipe(remove_diet_1_chicks)\n", " .head())" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.7" } }, "nbformat": 4, "nbformat_minor": 4 }