Scaling tasks#
In any bigger project you quickly come to the point where you stack multiple repetitions of tasks on top of each other.
For example, you have one dataset, four different ways to prepare it, and three statistical models to analyze the data. The cartesian product of all steps combined comprises twelve differently fitted models.
Here you find some tips on how to set up your tasks such that you can easily modify the cartesian product of steps.
Scalability#
Let us dive right into the aforementioned example. We start with one dataset data.csv
.
Then, we will create four different specifications of the data and, finally, fit three
different models to each specification.
This is the structure of the project.
my_project
├───pyproject.toml
│
├───src
│ └───my_project
│ ├────config.py
│ │
│ ├───data
│ │ └────data.csv
│ │
│ ├───data_preparation
│ │ ├────__init__.py
│ │ ├────config.py
│ │ └────task_prepare_data.py
│ │
│ └───estimation
│ ├────__init__.py
│ ├────config.py
│ └────task_estimate_models.py
│
│
├───setup.py
│
├───.pytask
│ └────...
│
└───bld
The folder structure, the main config.py
which holds SRC
and BLD
, and the tasks
follow the same structure advocated throughout the tutorials.
New are the local configuration files in each subfolder of my_project
, which contain
objects shared across tasks. For example, config.py
holds the paths to the processed
data and the names of the data sets.
# Content of config.py
from pathlib import Path
from my_project.config import BLD
from my_project.config import SRC
DATA = {
"data_0": {"subset": "subset_1"},
"data_1": {"subset": "subset_2"},
"data_2": {"subset": "subset_3"},
"data_3": {"subset": "subset_4"},
}
def path_to_input_data(name: str) -> Path:
return SRC / "data" / "data.csv"
def path_to_processed_data(name: str) -> Path:
return BLD / "data" / f"processed_{name}.pkl"
The task file task_prepare_data.py
uses these objects to build the repetitions.
# Content of task_prepare_data.py
from pathlib import Path
from my_project.data_preparation.config import DATA
from my_project.data_preparation.config import path_to_input_data
from my_project.data_preparation.config import path_to_processed_data
from pandas import pd
from pytask import Product
from pytask import task
from typing_extensions import Annotated
def _create_parametrization(data: list[str]) -> dict[str, Path]:
id_to_kwargs = {}
for data_name, kwargs in data.items():
id_to_kwargs[data_name] = {
"path_to_input_data": path_to_input_data(data_name),
"path_to_processed_data": path_to_processed_data(data_name),
**kwargs,
}
return id_to_kwargs
_ID_TO_KWARGS = _create_parametrization(DATA)
for id_, kwargs in _ID_TO_KWARGS.items():
@task(id=id_, kwargs=kwargs)
def task_prepare_data(
path_to_input_data: Path,
subset: str,
path_to_processed_data: Annotated[Path, Product],
) -> None:
df = pd.read_csv(path_to_input_data)
...
subset = df.loc[df["subset"].eq(subset)]
subset.to_pickle(path_to_processed_data)
All arguments for the loop and the @task
decorator are built
within a function to keep the logic in one place and the module’s namespace clean.
Ids are used to make the task ids more descriptive and to simplify their selection with expressions. Here is an example of the task ids with an explicit id.
# With id
.../my_project/data_preparation/task_prepare_data.py::task_prepare_data[data_0]
Next, we move to the estimation to see how we can build another repetition on top.
# Content of config.py
from pathlib import Path
from my_project.config import BLD
from my_project.data_preparation.config import DATA
_MODELS = ["linear_probability", "logistic_model", "decision_tree"]
ESTIMATIONS = {
f"{data_name}_{model_name}": {"model": model_name, "data": data_name}
for model_name in _MODELS
for data_name in DATA
}
def path_to_estimation_result(name: str) -> Path:
return BLD / "estimation" / f"estimation_{name}.pkl"
In the local configuration, we define ESTIMATIONS
which combines the information on
data and model. The dictionary’s key can be used as a task id whenever the estimation is
involved. It allows triggering all tasks related to one estimation - estimation,
figures, tables - with one command.
pytask -k linear_probability_data_0
And here is the task file.
# Content of task_estimate_models.py
from pathlib import Path
from my_project.data_preparation.config import path_to_processed_data
from my_project.estimations.config import ESTIMATIONS
from my_project.estimations.config import path_to_estimation_result
from pytask import Product
from pytask import task
from typing_extensions import Annotated
def _create_parametrization(
estimations: dict[str, dict[str, str]],
) -> dict[str, str | Path]:
id_to_kwargs = {}
for name, config in estimations.items():
id_to_kwargs[name] = {
"path_to_data": path_to_processed_data(config["data"]),
"model": config["model"],
"path_to_estimation": path_to_estimation_result(name),
}
return id_to_kwargs
_ID_TO_KWARGS = _create_parametrization(ESTIMATIONS)
for id_, kwargs in _ID_TO_KWARGS.items():
@task(id=id_, kwargs=kwargs)
def task_estmate_models(
path_to_data: Path, model: str, path_to_estimation: Annotated[Path, Product]
) -> None:
if model == "linear_probability":
...
Replicating this pattern across a project allows a clean way to define repetitions.
Extending repetitions#
Some parametrized tasks are costly to run - costly in terms of computing power, memory,
or time. Users often extend repetitions triggering all repetitions to be rerun. Thus,
use the @pytask.mark.persist
decorator, which is explained
in more detail in this tutorial.