Provisional nodes and task generators

pytask’s execution model can usually be separated into three phases.

  1. Collection of tasks, dependencies, and products.

  2. Building the DAG.

  3. Executing the tasks.

But, in some situations, pytask needs to be more flexible.

Imagine you want to download a folder with files from an online storage. Before the task is completed you do not know the total number of files or their filenames. How can you still describe the files as products of the task?

And how would you define another task that depends on these files?

The following sections will explain how you use pytask in these situations.

Producing provisional nodes

As an example for the aforementioned scenario, let us write a task that downloads all files without a file extension from the root folder of the pytask GitHub repository. The files are downloaded to a folder called downloads. downloads is in the same folder as the task module because it is a relative path.

from pathlib import Path

import httpx
from pytask import DirectoryNode
from pytask import Product
from typing_extensions import Annotated


def get_files_without_file_extensions_from_repo() -> list[str]:
    url = "https://api.github.com/repos/pytask-dev/pytask/git/trees/main"
    response = httpx.get(url)
    elements = response.json()["tree"]
    return [
        e["path"]
        for e in elements
        if e["type"] == "blob" and Path(e["path"]).suffix == ""
    ]


def task_download_files(
    download_folder: Annotated[
        Path, DirectoryNode(root_dir=Path("downloads"), pattern="*"), Product
    ],
) -> None:
    """Download files."""
    # Contains names like CITATION or LICENSE.
    files_to_download = get_files_without_file_extensions_from_repo()

    for file_ in files_to_download:
        url = "raw.githubusercontent.com/pytask-dev/pytask/main"
        response = httpx.get(url=f"{url}/{file_}", timeout=5)
        content = response.text
        download_folder.joinpath(file_).write_text(content)

Since the names of the files are not known when pytask is started, we need to use a DirectoryNode to define the task’s product. With a DirectoryNode we can specify where pytask can find the files. The files are described with a root path (default is the directory of the task module) and a glob pattern (default is *).

When we use the DirectoryNode as a product annotation, we get access to the root_dir as a Path object inside the function, which allows us to store the files.

Note

The DirectoryNode is a provisional node that implements PProvisionalNode. A provisional node is not a PNode, but when its collect() method is called, it returns actual nodes. A DirectoryNode, for example, returns PathNode.

Depending on provisional nodes

In the next step, we want to define a task that consumes and merges all previously downloaded files into one file.

The difficulty here is how can we reference the downloaded files before they have been downloaded.

from pathlib import Path

from pytask import DirectoryNode
from typing_extensions import Annotated


def task_merge_files(
    paths: Annotated[
        list[Path], DirectoryNode(root_dir=Path("downloads"), pattern="*")
    ],
) -> Annotated[str, Path("all_text.txt")]:
    """Merge files."""
    contents = [path.read_text() for path in paths]
    return "\n".join(contents)

To reference the files that will be downloaded, we use the DirectoryNode is a dependency. Before the task is executed, the list of files in the folder defined by the root path and the pattern are automatically collected and passed to the task.

If we use a DirectoryNode with the same root_dir and pattern in both tasks, pytask will automatically recognize that the second task depends on the first. If that is not true, you might need to make this dependency more explicit by using @task(after=...), which is explained here.

Task generators

What if we wanted to process each downloaded file separately instead of dealing with them in one task?

For that, we have to write a task generator to define an unknown number of tasks for an unknown number of downloaded files.

A task generator is a task function in which we define more tasks, just as if we were writing functions in a task module.

The code snippet shows each task takes one of the downloaded files and copies its content to a .txt file.

from pathlib import Path

from pytask import DirectoryNode
from pytask import task
from typing_extensions import Annotated


@task(is_generator=True)
def task_copy_files(
    paths: Annotated[
        list[Path], DirectoryNode(root_dir=Path("downloads"), pattern="*")
    ],
) -> None:
    """Create tasks to copy each file to a ``.txt`` file."""
    for path in paths:
        # The path of the copy will be CITATION.txt, for example.
        path_to_copy = path.with_suffix(".txt")

        @task
        def copy_file(path: Annotated[Path, path]) -> Annotated[str, path_to_copy]:
            return path.read_text()

Important

The generated tasks need to be decoratored with @task to be collected.