Writing custom nodes

In the previous tutorials and how-to guides, you learned that dependencies and products can be represented as plain Python objects with PythonNode or as paths where every pathlib.Path is converted to a PathNode.

In this how-to guide, you will learn about the general concept of nodes and how to write your own to improve your workflows.

Use-case

A typical task operation is to load data like a DataFrame from a pickle file, transform it, and store it on disk. The usual way would be to use paths to point to inputs and outputs and call read_pickle() and to_pickle().

from pathlib import Path

import pandas as pd


def task_example(in_: Path = Path("in.pkl"), out: Path = Path("out.pkl")) -> None:
    df = pd.read_pickle(in_.read_bytes())
    # Some transformations.
    df.to_pickle(out)

To remove IO operations from the task and delegate them to pytask, we will replicate the PickleNode that automatically loads and stores Python objects.

And we pass the value to df via Annotated to preserve the type hint.

The result will be the following task.

from pathlib import Path
from typing import Annotated

import pandas as pd
from pytask import Product


class PickleNode: ...


in_node = PickleNode.from_path(Path(__file__).parent / "in.pkl")
out_node = PickleNode.from_path(Path(__file__).parent / "out.pkl")


def task_example(
    df: Annotated[pd.DataFrame, in_node], out: Annotated[PickleNode, out_node, Product]
) -> None:
    transformed = df.apply(...)
    out.save(transformed)
from pathlib import Path
from typing import Annotated

import pandas as pd


class PickleNode: ...


in_node = PickleNode.from_path(Path(__file__).parent / "in.pkl")
out_node = PickleNode.from_path(Path(__file__).parent / "out.pkl")


def task_example(
    df: Annotated[pd.DataFrame, in_node],
) -> Annotated[pd.DataFrame, out_node]:
    return df.apply(...)
from pathlib import Path

import pandas as pd
from pytask import Product
from typing_extensions import Annotated


class PickleNode: ...


in_node = PickleNode.from_path(Path(__file__).parent / "in.pkl")
out_node = PickleNode.from_path(Path(__file__).parent / "out.pkl")


def task_example(
    df: Annotated[pd.DataFrame, in_node], out: Annotated[PickleNode, out_node, Product]
) -> None:
    transformed = df.apply(...)
    out.save(transformed)
from pathlib import Path

import pandas as pd
from typing_extensions import Annotated


class PickleNode: ...


in_node = PickleNode.from_path(Path(__file__).parent / "in.pkl")
out_node = PickleNode.from_path(Path(__file__).parent / "out.pkl")


def task_example(
    df: Annotated[pd.DataFrame, in_node],
) -> Annotated[pd.DataFrame, out_node]:
    return df.apply(...)

Nodes

A custom node needs to follow an interface so that pytask can perform several actions:

  • Check whether the node is up-to-date and run the workflow if necessary.

  • Load and save values when tasks are executed.

This interface is defined by protocols [1]. A custom node must follow at least the protocol PNode or, even better, PPathNode if it is based on a path. The common node for paths, PathNode, follows the protocol PPathNode.

PickleNode

Since our PickleNode will only vary slightly from PathNode, we use it as a template, and with some minor modifications, we arrive at the following class.

import hashlib
import pickle
from pathlib import Path
from typing import Any

from pytask import hash_value


class PickleNode:
    """The class for a node that persists values with pickle to files.

    Parameters
    ----------
    name
        Name of the node which makes it identifiable in the DAG.
    path
        The path to the file.

    """

    def __init__(self, name: str = "", path: Path | None = None) -> None:
        self.name = name
        self.path = path

    @property
    def signature(self) -> str:
        """The unique signature of the node."""
        raw_key = str(hash_value(self.path))
        return hashlib.sha256(raw_key.encode()).hexdigest()

    @classmethod
    def from_path(cls, path: Path) -> "PickleNode":
        """Instantiate class from path to file."""
        if not path.is_absolute():
            msg = "Node must be instantiated from absolute path."
            raise ValueError(msg)
        return cls(name=path.as_posix(), path=path)

    def state(self) -> str | None:
        """Return the modification timestamp as the state."""
        if self.path.exists():
            return str(self.path.stat().st_mtime)
        return None

    def load(self, is_product: bool) -> Path:
        """Load the value from the path."""
        if is_product:
            return self
        return pickle.loads(self.path.read_bytes())

    def save(self, value: Any) -> None:
        """Save any value with pickle to the file."""
        self.path.write_bytes(pickle.dumps(value))
import hashlib
import pickle
from pathlib import Path
from typing import Any
from typing import Optional

from pytask import hash_value


class PickleNode:
    """The class for a node that persists values with pickle to files.

    Parameters
    ----------
    name
        Name of the node which makes it identifiable in the DAG.
    path
        The path to the file.

    """

    def __init__(self, name: str = "", path: Optional[Path] = None) -> None:
        self.name = name
        self.path = path

    @property
    def signature(self) -> str:
        """The unique signature of the node."""
        raw_key = str(hash_value(self.path))
        return hashlib.sha256(raw_key.encode()).hexdigest()

    @classmethod
    def from_path(cls, path: Path) -> "PickleNode":
        """Instantiate class from path to file."""
        if not path.is_absolute():
            msg = "Node must be instantiated from absolute path."
            raise ValueError(msg)
        return cls(name=path.as_posix(), path=path)

    def state(self) -> Optional[str]:
        """Return the modification timestamp as the state."""
        if self.path.exists():
            return str(self.path.stat().st_mtime)
        return None

    def load(self, is_product: bool) -> Path:
        """Load the value from the path."""
        if is_product:
            return self
        return pickle.loads(self.path.read_bytes())

    def save(self, value: Any) -> None:
        """Save any value with pickle to the file."""
        self.path.write_bytes(pickle.dumps(value))

Here are some explanations.

  • The node does not need to inherit from the protocol PPathNode, but you can do it to be more explicit.

  • The node has two attributes

    • name identifies the node in the DAG, so the name must be unique.

    • path holds the path to the file and identifies the node as a path node that is handled slightly differently than normal nodes within pytask.

  • The node has an additional property that computes the signature of the node. The signature is a hash and a unique identifier for the node. For most nodes it will be a hash of the path or the name.

  • The classmethod() from_path() is a convenient method to instantiate the class.

  • The method state() yields a value that signals the node’s state. If the value changes, pytask knows it needs to regenerate the workflow. We can use the timestamp of when the node was last modified.

  • pytask calls load() when it collects the values of function arguments to run the function. The argument is_product signals that the node is loaded as a product with a Product annotation or via produces.

    When the node is loaded as a dependency, we want to inject the value of the pickle file. In the other case, the node returns itself so users can call save() themselves.

  • save() is called when a task function returns and allows to save the return values.

Conclusion

Nodes are an important in concept pytask. They allow to pytask to build a DAG and generate a workflow, and they also allow users to extract IO operations from the task function into the nodes.

pytask only implements two node types, PathNode and PythonNode, but many more are possible. In the future, there should probably be a plugin that implements nodes for many other data sources like AWS S3 or databases. [2]

References