Scaling tasks

In any bigger project you quickly come to the point where you stack multiple repetitions of tasks on top of each other.

For example, you have one dataset, four different ways to prepare it, and three statistical models to analyze the data. The cartesian product of all steps combined comprises twelve differently fitted models.

Here you find some tips on how to set up your tasks such that you can easily modify the cartesian product of steps.

Scalability

Let us dive right into the aforementioned example. We start with one dataset data.csv. Then, we will create four different specifications of the data and, finally, fit three different models to each specification.

This is the structure of the project.

my_project
├───pyproject.toml
│
├───src
│   └───my_project
│       ├────config.py
│       │
│       ├───data
│       │   └────data.csv
│       │
│       ├───data_preparation
│       │   ├────__init__.py
│       │   ├────config.py
│       │   └────task_prepare_data.py
│       │
│       └───estimation
│           ├────__init__.py
│           ├────config.py
│           └────task_estimate_models.py
│
│
├───setup.py
│
├───.pytask
│   └────...
│
└───bld

The folder structure, the main config.py which holds SRC and BLD, and the tasks follow the same structure advocated throughout the tutorials.

New are the local configuration files in each subfolder of my_project, which contain objects shared across tasks. For example, config.py holds the paths to the processed data and the names of the data sets.

# Content of config.py
from pathlib import Path

from my_project.config import BLD
from my_project.config import SRC

DATA = {
    "data_0": {"subset": "subset_1"},
    "data_1": {"subset": "subset_2"},
    "data_2": {"subset": "subset_3"},
    "data_3": {"subset": "subset_4"},
}


def path_to_input_data(name: str) -> Path:
    return SRC / "data" / "data.csv"


def path_to_processed_data(name: str) -> Path:
    return BLD / "data" / f"processed_{name}.pkl"

The task file task_prepare_data.py uses these objects to build the repetitions.

# Content of task_prepare_data.py
from pathlib import Path

from my_project.data_preparation.config import DATA
from my_project.data_preparation.config import path_to_input_data
from my_project.data_preparation.config import path_to_processed_data
from pandas import pd
from pytask import Product
from pytask import task
from typing_extensions import Annotated


def _create_parametrization(data: list[str]) -> dict[str, Path]:
    id_to_kwargs = {}
    for data_name, kwargs in data.items():
        id_to_kwargs[data_name] = {
            "path_to_input_data": path_to_input_data(data_name),
            "path_to_processed_data": path_to_processed_data(data_name),
            **kwargs,
        }

    return id_to_kwargs


_ID_TO_KWARGS = _create_parametrization(DATA)


for id_, kwargs in _ID_TO_KWARGS.items():

    @task(id=id_, kwargs=kwargs)
    def task_prepare_data(
        path_to_input_data: Path,
        subset: str,
        path_to_processed_data: Annotated[Path, Product],
    ) -> None:
        df = pd.read_csv(path_to_input_data)
        # ... transform the data.
        subset = df.loc[df["subset"].eq(subset)]
        subset.to_pickle(path_to_processed_data)

All arguments for the loop and the @task decorator are built within a function to keep the logic in one place and the module’s namespace clean.

Ids are used to make the task ids more descriptive and to simplify their selection with expressions. Here is an example of the task ids with an explicit id.

# With id
.../my_project/data_preparation/task_prepare_data.py::task_prepare_data[data_0]

Next, we move to the estimation to see how we can build another repetition on top.

# Content of config.py
from pathlib import Path

from my_project.config import BLD
from my_project.data_preparation.config import DATA

_MODELS = ["linear_probability", "logistic_model", "decision_tree"]


ESTIMATIONS = {
    f"{data_name}_{model_name}": {"model": model_name, "data": data_name}
    for model_name in _MODELS
    for data_name in DATA
}


def path_to_estimation_result(name: str) -> Path:
    return BLD / "estimation" / f"estimation_{name}.pkl"

In the local configuration, we define ESTIMATIONS which combines the information on data and model. The dictionary’s key can be used as a task id whenever the estimation is involved. It allows triggering all tasks related to one estimation - estimation, figures, tables - with one command.

pytask -k linear_probability_data_0

And here is the task file.

# Content of task_estimate_models.py
from pathlib import Path

from my_project.data_preparation.config import path_to_processed_data
from my_project.estimations.config import ESTIMATIONS
from my_project.estimations.config import path_to_estimation_result
from pytask import Product
from pytask import task
from typing_extensions import Annotated


def _create_parametrization(
    estimations: dict[str, dict[str, str]],
) -> dict[str, str | Path]:
    id_to_kwargs = {}
    for name, config in estimations.items():
        id_to_kwargs[name] = {
            "path_to_data": path_to_processed_data(config["data"]),
            "model": config["model"],
            "path_to_estimation": path_to_estimation_result(name),
        }

    return id_to_kwargs


_ID_TO_KWARGS = _create_parametrization(ESTIMATIONS)


for id_, kwargs in _ID_TO_KWARGS.items():

    @task(id=id_, kwargs=kwargs)
    def task_estmate_models(
        path_to_data: Path, model: str, path_to_estimation: Annotated[Path, Product]
    ) -> None:
        if model == "linear_probability":
            ...

Replicating this pattern across a project allows a clean way to define repetitions.

Extending repetitions

Some parametrized tasks are costly to run - costly in terms of computing power, memory, or time. Users often extend repetitions triggering all repetitions to be rerun. Thus, use the @pytask.mark.persist decorator, which is explained in more detail in this tutorial.