Writing custom nodes#
In the previous tutorials and how-to guides, you learned that dependencies and products
can be represented as plain Python objects with PythonNode
or as paths
where every pathlib.Path
is converted to a PathNode
.
In this how-to guide, you will learn about the general concept of nodes and how to write your own to improve your workflows.
Use-case#
A typical task operation is to load data like a pandas.DataFrame
from a pickle
file, transform it, and store it on disk. The usual way would be to use paths to point
to inputs and outputs and call pandas.read_pickle()
and
pandas.DataFrame.to_pickle()
.
from pathlib import Path
import pandas as pd
def task_example(in_: Path = Path("in.pkl"), out: Path = Path("out.pkl")) -> None:
df = pd.read_pickle(in_.read_bytes())
# Some transformations.
df.to_pickle(out)
To remove IO operations from the task and delegate them to pytask, we will write a
PickleNode
that automatically loads and stores Python objects.
And we pass the value to df
via Annotated
to preserve the type hint.
The result will be the following task.
from pathlib import Path
from typing import Annotated
import pandas as pd
from pytask import Product
class PickleNode:
...
in_node = PickleNode.from_path(Path(__file__).parent / "in.pkl")
out_node = PickleNode.from_path(Path(__file__).parent / "out.pkl")
def task_example(
df: Annotated[pd.DataFrame, in_node], out: Annotated[PickleNode, out_node, Product]
) -> None:
transformed = df.apply(...)
out.save(transformed)
from pathlib import Path
from typing import Annotated
import pandas as pd
class PickleNode:
...
in_node = PickleNode.from_path(Path(__file__).parent / "in.pkl")
out_node = PickleNode.from_path(Path(__file__).parent / "out.pkl")
def task_example(
df: Annotated[pd.DataFrame, in_node]
) -> Annotated[pd.DataFrame, out_node]:
return df.apply(...)
from pathlib import Path
import pandas as pd
from pytask import Product
from typing_extensions import Annotated
class PickleNode:
...
in_node = PickleNode.from_path(Path(__file__).parent / "in.pkl")
out_node = PickleNode.from_path(Path(__file__).parent / "out.pkl")
def task_example(
df: Annotated[pd.DataFrame, in_node], out: Annotated[PickleNode, out_node, Product]
) -> None:
transformed = df.apply(...)
out.save(transformed)
from pathlib import Path
import pandas as pd
from typing_extensions import Annotated
class PickleNode:
...
in_node = PickleNode.from_path(Path(__file__).parent / "in.pkl")
out_node = PickleNode.from_path(Path(__file__).parent / "out.pkl")
def task_example(
df: Annotated[pd.DataFrame, in_node]
) -> Annotated[pd.DataFrame, out_node]:
return df.apply(...)
Nodes#
A custom node needs to follow an interface so that pytask can perform several actions:
Check whether the node is up-to-date and run the workflow if necessary.
Load and save values when tasks are executed.
This interface is defined by protocols [1]. A custom node must
follow at least the protocol PNode
or, even better,
PPathNode
if it is based on a path. The common node for paths,
PathNode
, follows the protocol PPathNode
.
PickleNode
#
Since our PickleNode
will only vary slightly from PathNode
, we
use it as a template, and with some minor modifications, we arrive at the following
class.
import hashlib
import pickle
from pathlib import Path
from typing import Any
from pytask import hash_value
class PickleNode:
"""The class for a node that persists values with pickle to files.
Parameters
----------
name
Name of the node which makes it identifiable in the DAG.
path
The path to the file.
"""
def __init__(self, name: str = "", path: Path | None = None) -> None:
self.name = name
self.path = path
@property
def signature(self) -> str:
"""The unique signature of the node."""
raw_key = str(hash_value(self.path))
return hashlib.sha256(raw_key.encode()).hexdigest()
@classmethod
def from_path(cls, path: Path) -> "PickleNode":
"""Instantiate class from path to file."""
if not path.is_absolute():
msg = "Node must be instantiated from absolute path."
raise ValueError(msg)
return cls(name=path.as_posix(), path=path)
def state(self) -> str | None:
"""Return the modification timestamp as the state."""
if self.path.exists():
return str(self.path.stat().st_mtime)
return None
def load(self, is_product: bool) -> Path:
"""Load the value from the path."""
if is_product:
return self
return pickle.loads(self.path.read_bytes())
def save(self, value: Any) -> None:
"""Save any value with pickle to the file."""
self.path.write_bytes(pickle.dumps(value))
import hashlib
import pickle
from pathlib import Path
from typing import Any
from typing import Optional
from pytask import hash_value
class PickleNode:
"""The class for a node that persists values with pickle to files.
Parameters
----------
name
Name of the node which makes it identifiable in the DAG.
path
The path to the file.
"""
def __init__(self, name: str = "", path: Optional[Path] = None) -> None:
self.name = name
self.path = path
@property
def signature(self) -> str:
"""The unique signature of the node."""
raw_key = str(hash_value(self.path))
return hashlib.sha256(raw_key.encode()).hexdigest()
@classmethod
def from_path(cls, path: Path) -> "PickleNode":
"""Instantiate class from path to file."""
if not path.is_absolute():
msg = "Node must be instantiated from absolute path."
raise ValueError(msg)
return cls(name=path.as_posix(), path=path)
def state(self) -> Optional[str]:
"""Return the modification timestamp as the state."""
if self.path.exists():
return str(self.path.stat().st_mtime)
return None
def load(self, is_product: bool) -> Path:
"""Load the value from the path."""
if is_product:
return self
return pickle.loads(self.path.read_bytes())
def save(self, value: Any) -> None:
"""Save any value with pickle to the file."""
self.path.write_bytes(pickle.dumps(value))
Here are some explanations.
The node does not need to inherit from the protocol
PPathNode
, but you can do it to be more explicit.The node has two attributes
name
identifies the node in the DAG, so the name must be unique.path
holds the path to the file and identifies the node as a path node that is handled slightly differently than normal nodes within pytask.
The node has an additional property that computes the signature of the node. The signature is a hash and a unique identifier for the node. For most nodes it will be a hash of the path or the name.
The
classmethod()
PickleNode.from_path()
is a convenient method to instantiate the class.The method
PickleNode.state()
yields a value that signals the node’s state. If the value changes, pytask knows it needs to regenerate the workflow. We can use the timestamp of when the node was last modified.pytask calls
PickleNode.load()
when it collects the values of function arguments to run the function. The argumentis_product
signals that the node is loaded as a product with aProduct
annotation or viaproduces
.When the node is loaded as a dependency, we want to inject the value of the pickle file. In the other case, the node returns itself so users can call
PickleNode.save()
themselves.PickleNode.save()
is called when a task function returns and allows to save the return values.
Conclusion#
Nodes are an important in concept pytask. They allow to pytask to build a DAG and generate a workflow, and they also allow users to extract IO operations from the task function into the nodes.
pytask only implements two node types, PathNode
and
PythonNode
, but many more are possible. In the future, there should
probably be a plugin that implements nodes for many other data sources like AWS S3 or
databases. [2]