Provisional nodes and task generators¶
pytask’s execution model can usually be separated into three phases.
Collection of tasks, dependencies, and products.
Building the DAG.
Executing the tasks.
But, in some situations, pytask needs to be more flexible.
Imagine you want to download a folder with files from an online storage. Before the task is completed you do not know the total number of files or their filenames. How can you still describe the files as products of the task?
And how would you define another task that depends on these files?
The following sections will explain how you use pytask in these situations.
Producing provisional nodes¶
As an example for the aforementioned scenario, let us write a task that downloads all
files without a file extension from the root folder of the pytask GitHub repository. The
files are downloaded to a folder called downloads
. downloads
is in the same folder
as the task module because it is a relative path.
from pathlib import Path
import httpx
from pytask import DirectoryNode
from pytask import Product
from typing_extensions import Annotated
def get_files_without_file_extensions_from_repo() -> list[str]:
url = "https://api.github.com/repos/pytask-dev/pytask/git/trees/main"
response = httpx.get(url)
elements = response.json()["tree"]
return [
e["path"]
for e in elements
if e["type"] == "blob" and Path(e["path"]).suffix == ""
]
def task_download_files(
download_folder: Annotated[
Path, DirectoryNode(root_dir=Path("downloads"), pattern="*"), Product
],
) -> None:
"""Download files."""
# Contains names like CITATION or LICENSE.
files_to_download = get_files_without_file_extensions_from_repo()
for file_ in files_to_download:
url = "raw.githubusercontent.com/pytask-dev/pytask/main"
response = httpx.get(url=f"{url}/{file_}", timeout=5)
content = response.text
download_folder.joinpath(file_).write_text(content)
Since the names of the files are not known when pytask is started, we need to use a
DirectoryNode
to define the task’s product. With a
DirectoryNode
we can specify where pytask can find the files. The files
are described with a root path (default is the directory of the task module) and a glob
pattern (default is *
).
When we use the DirectoryNode
as a product annotation, we get access to
the root_dir
as a Path
object inside the function, which allows us
to store the files.
Note
The DirectoryNode
is a provisional node that implements
PProvisionalNode
. A provisional node is not a PNode
,
but when its collect()
method is called, it returns
actual nodes. A DirectoryNode
, for example, returns
PathNode
.
Depending on provisional nodes¶
In the next step, we want to define a task that consumes and merges all previously downloaded files into one file.
The difficulty here is how can we reference the downloaded files before they have been downloaded.
from pathlib import Path
from pytask import DirectoryNode
from typing_extensions import Annotated
def task_merge_files(
paths: Annotated[
list[Path], DirectoryNode(root_dir=Path("downloads"), pattern="*")
],
) -> Annotated[str, Path("all_text.txt")]:
"""Merge files."""
contents = [path.read_text() for path in paths]
return "\n".join(contents)
To reference the files that will be downloaded, we use the
DirectoryNode
is a dependency. Before the task is executed, the list of
files in the folder defined by the root path and the pattern are automatically collected
and passed to the task.
If we use a DirectoryNode
with the same root_dir
and pattern
in
both tasks, pytask will automatically recognize that the second task depends on the
first. If that is not true, you might need to make this dependency more explicit by
using @task(after=...)
, which is explained here.
Task generators¶
What if we wanted to process each downloaded file separately instead of dealing with them in one task?
For that, we have to write a task generator to define an unknown number of tasks for an unknown number of downloaded files.
A task generator is a task function in which we define more tasks, just as if we were writing functions in a task module.
The code snippet shows each task takes one of the downloaded files and copies its
content to a .txt
file.
from pathlib import Path
from pytask import DirectoryNode
from pytask import task
from typing_extensions import Annotated
@task(is_generator=True)
def task_copy_files(
paths: Annotated[
list[Path], DirectoryNode(root_dir=Path("downloads"), pattern="*")
],
) -> None:
"""Create tasks to copy each file to a ``.txt`` file."""
for path in paths:
# The path of the copy will be CITATION.txt, for example.
path_to_copy = path.with_suffix(".txt")
@task
def copy_file(path: Annotated[Path, path]) -> Annotated[str, path_to_copy]:
return path.read_text()
Important
The generated tasks need to be decoratored with @task
to be
collected.