Python Task API¶
DV Flow Manager provides two task-oriented API extension mechanisms that allow leaf tasks to be implemented as a Python task and allow the body of a compound task to be generated.
Leaf-Task Implementation API¶
The core implementation for tasks is provided by a Python async method. This method is passed two parameters:
runner - Services that the task runner provides for the use of tasks
input - The input data for the task
The method must return a TaskDataResult object with the execution status of the task, result data, markers, and memento data.
TaskDataInput¶
An object of type TaskDataInput is passed as the input parameter of the runner method.
- class dv_flow.mgr.TaskDataInput(*, name: str, changed: bool, srcdir: str, rundir: str, params: Any, inputs: List[Any], memento: Any)¶
Input data to a task:
name - name of the task
changed - indicates whether any of this task’s dependencies have changed
rundir - directory in which the task is to be run
params - parameters to the task
inputs - list of TaskDataItem that are consumed’ by this task
memento - memento data previously returned by this task. None if no memento is available
TaskDataItem¶
Data is passed between tasks via TaskDataItem-derived objects. Each task may produce 0 or more TaskDataItem objects as output. A task receives all the TaskDataItem objects produced by its dependencies.
- class dv_flow.mgr.TaskDataItem(*, type: str, src: str = None, seq: int = -1)¶
Base class for task data items
type - Name of the data item type
src - Name of the task that produced this item
seq - Sequence number of the item within the task
TaskRunCtxt¶
The task implementaion is passed a task-run context object that provides utilities for the task.
- class dv_flow.mgr.TaskRunCtxt(runner: 'TaskRunner', ctxt: dv_flow.mgr.task_node_ctxt.TaskNodeCtxt, rundir: str, _markers: List[dv_flow.mgr.task_data.TaskMarker] = <factory>, _exec_info: List[dv_flow.mgr.task_run_ctxt.ExecInfo] = <factory>, _log: logging.Logger = <factory>, _exec_start_callback: <built-in function callable> = None, _exec_end_callback: <built-in function callable> = None)¶
- create(path, content)¶
Create a file in the task’s rundir
- error(msg: str, loc: TaskMarkerLoc = None)¶
Add an error marker related to the task’s execution
- async exec(cmd: List[str], logfile=None, logfilter=None, cwd=None, env=None)¶
Executes a command as part of the task’s implementation. Output from the command will be saved to the specified logfile, or to a default logfile if not specified. If the command fails, an error marker will be added.
Example:
status |= await runner.exec(['ls', '-l'], logfile='ls.log')
- async exec_parallel(cmds: List[ExecCmd], logfilters: Dict[int, callable] = None) List[int]¶
Execute multiple commands in parallel, subject to nproc limits.
Each command runs independently, respecting the exec_semaphore concurrency limits. Returns an array of status codes corresponding to each command in the input array.
- Args:
cmds: List of ExecCmd objects describing commands to run logfilters: Optional dict mapping command index to logfilter callable
- Returns:
List of integer status codes, one per command in same order as input
Example:
from dv_flow.mgr.task_run_ctxt import ExecCmd cmds = [ ExecCmd(cmd=['gcc', '-c', 'file1.c'], logfile='compile1.log'), ExecCmd(cmd=['gcc', '-c', 'file2.c'], logfile='compile2.log'), ExecCmd(cmd=['gcc', '-c', 'file3.c'], logfile='compile3.log'), ] statuses = await runner.exec_parallel(cmds) # statuses[0] = exit code for file1.c compile # statuses[1] = exit code for file2.c compile # statuses[2] = exit code for file3.c compile
- info(msg: str, loc: TaskMarkerLoc = None)¶
Add an error marker related to the task’s execution
- marker(msg: str, severity: SeverityE, loc: TaskMarkerLoc = None)¶
Add a marker related to the task’s execution
- mkDataItem(type, **kwargs)¶
Create a data item in the task’s rundir. The data item will be created in the task’s rundir, and will be available to the task’s implementation.
- async run_subgraph(tasks: TaskNode | List[TaskNode], name: str = None, timeout: float = None) TaskDataOutput | List[TaskDataOutput]¶
Execute a sub-graph of tasks dynamically during task execution.
This schedules the tasks into the current execution schedule, respecting dependencies and resource constraints. The call blocks until all tasks complete.
- Args:
tasks: TaskNode or list of TaskNodes to execute name: Optional name prefix for sub-tasks (for debugging) timeout: Optional timeout in seconds (None = no timeout)
- Returns:
TaskDataOutput or list of TaskDataOutput from terminal tasks
- Raises:
asyncio.TimeoutError: If timeout is exceeded Exception: If sub-graph execution fails
Example:
async def run(ctxt, input): # Create dynamic tasks task1 = ctxt.mkTaskNode("type1", name="task1") task2 = ctxt.mkTaskNode("type2", name="task2", needs=[task1]) # Execute dynamically with timeout results = await ctxt.run_subgraph([task1, task2], timeout=30.0) return TaskDataResult(status=0, output=results)
TaskDataResult¶
Task implementation methods must return an object of type TaskDataResult. This object contains key data about task execution.
- class dv_flow.mgr.TaskDataResult(*, changed: bool = True, output: ~typing.List[~typing.Any] = <factory>, memento: ~typing.Any = None, markers: ~typing.List[~dv_flow.mgr.task_data.TaskMarker] = <factory>, status: int = 0, cache_hit: bool = False, cache_stored: bool = False)¶
Result data from a task:
changed - indicates whether the task modified its result data
output - list of output parameter sets
memento - memento data to be passed to the next invocation of the task
markers - list of markers produced by the task
status - status code (0=success, non-zero=failure)
cache_hit - indicates whether the task result came from cache
cache_stored - indicates whether the task result was stored in cache
TaskMarker¶
Tasks may produce markers to highlight key information to the user. A marker is typically a pointer to a file location with an associated severity level (error, warning, info).
- class dv_flow.mgr.TaskMarker(*, msg: str, severity: SeverityE, loc: TaskMarkerLoc = None)¶
Captures marker data produced by a task.
- class dv_flow.mgr.TaskMarkerLoc(*, path: str, line: int = -1, pos: int = -1)¶
Captures the source location of a marker
Task-Graph Generation API¶
DV flow manager supports the generate strategy by calling a Python task that is responsible for building the body of the compound task.
A graph-building method has the following signature:
def build_graph(ctxt : TaskGenCtxt, input : TaskGenInputData):
pass
ctxt - Provides services for building and registering tasks
input - Input to the generator. Currently, the task parameters
Assuming that this method is defined in a module named my_module, the following YAML specifies that the task will be called to generate the body of the compound task:
tasks:
- name: mytask
strategy:
generate:
run: my_module.build_graph
TaskGenCtxt API¶
- class dv_flow.mgr.TaskGenCtxt(rundir: str, srcdir: str, input: dv_flow.mgr.task_node.TaskNode, basename: str, builder: 'TaskGraphBuilder', body: List[ForwardRef('Task')] = <factory>, tasks: List[ForwardRef('TaskNode')] = <factory>, _idx: int = 1)¶
- addTask(task: TaskNode) TaskNode¶
Add the specified task to the graph
- getInputs() List[object]¶
Gets the input datasets of the containing task
- mkName(name: str) str¶
Create a taskname relative to containing compound task
- mkTaskNode(type_t, name=None, srcdir=None, needs=None, **kwargs)¶
Creates a new task node. - type_t - type name of the task - name - (Optional) full name of the task - srcdir - (Optional) - needs - (Optional) task nodes required by this task - kwargs - (Optionl) parameter overrides for the task
TaskGenInputData¶
The TaskGenInputData class provides the value of task parameters specified on the containing compound task.
Custom Up-to-Date Check API¶
Tasks can define custom up-to-date check methods to determine whether a task needs to be re-executed. This is useful for tasks that reference files not explicitly listed in a fileset.
Implementing a Custom Check¶
A custom up-to-date method is an async Python function with the following signature:
async def check_uptodate(ctxt: UpToDateCtxt) -> bool:
"""
Check if task is up-to-date.
Returns True if the task is up-to-date and should be skipped,
False if the task needs to run.
"""
pass
The method is specified in the task definition YAML:
tasks:
- name: my_task
uptodate: mymodule.check_uptodate
run: mymodule.run_task
UpToDateCtxt¶
The context object passed to custom up-to-date methods provides access to the task’s run directory, parameters, inputs, and previous execution data.
- class dv_flow.mgr.uptodate_ctxt.UpToDateCtxt(rundir: str, srcdir: str, params: Any, inputs: List[Any], exec_data: dict, memento: Any | None = None)¶
Context passed to custom up-to-date check methods
- async exec(cmd: List[str], cwd: str = None, env: Dict[str, str] = None) int¶
Run a subprocess for dependency checking. Returns the exit status code.
Example: Checking File Timestamps¶
Here’s an example of a custom up-to-date check that verifies whether an external dependency file has been modified:
import os
from dv_flow.mgr.uptodate_ctxt import UpToDateCtxt
async def check_external_deps(ctxt: UpToDateCtxt) -> bool:
"""Check if external dependency files are unchanged."""
# Get the recorded timestamp from previous execution
prev_mtime = ctxt.exec_data.get('dep_mtime')
if prev_mtime is None:
return False # No previous data, must run
# Check current timestamp
dep_file = os.path.join(ctxt.rundir, "..", "external_deps.txt")
if not os.path.exists(dep_file):
return False # Dependency missing, must run
current_mtime = os.path.getmtime(dep_file)
# Up-to-date if timestamp hasn't changed
return current_mtime == prev_mtime
Example: Running a Subprocess Check¶
The UpToDateCtxt.exec() method allows running subprocesses for
dependency checking:
from dv_flow.mgr.uptodate_ctxt import UpToDateCtxt
async def check_git_status(ctxt: UpToDateCtxt) -> bool:
"""Check if git working tree is clean."""
# Returns 0 if no changes, non-zero otherwise
status = await ctxt.exec(["git", "diff", "--quiet", "HEAD"])
return status == 0 # Up-to-date if no changes
PyTask Class API Reference¶
The PyTask class provides a class-based interface for implementing tasks
in Python. This approach offers better organization, type safety, and
reusability compared to function-based tasks.
PyTask Base Class¶
- class dv_flow.mgr.PyTask(desc: str = '', doc: str = '', shell: str = 'pytask', _ctxt: dv_flow.mgr.task_run_ctxt.TaskRunCtxt | None = None, _input: dv_flow.mgr.task_data.TaskDataInput[PyTask.Params] | None = None)¶
- class Params¶
Class Attributes¶
desc (str): Short description of the task
doc (str): Full documentation for the task
shell (str): Shell to use for execution (default: “pytask”)
Instance Attributes¶
_ctxt (TaskRunCtxt): Task execution context (set by runtime)
_input (TaskDataInput): Task input data (set by runtime)
Properties¶
params: Typed access to task parameters (returns instance of nested Params class)
Defining a PyTask¶
from dv_flow.mgr import PyTask
import dataclasses as dc
@dc.dataclass
class MyTask(PyTask):
desc = "Brief description"
doc = "Detailed documentation..."
@dc.dataclass
class Params:
input_file: str = ""
output_file: str = ""
verbose: bool = False
async def __call__(self) -> Union[str, None]:
# Access parameters
print(f"Processing {self.params.input_file}")
# Access context
rundir = self._ctxt.rundir
# Execute commands
status = await self._ctxt.exec(
["process", self.params.input_file],
logfile="process.log"
)
# Return None for direct execution
# Or return a command string to execute
return None
Using PyTask¶
Reference the PyTask class in YAML:
tasks:
- name: my_task
shell: pytask
run: my_module.MyTask
with:
input_file: data.txt
output_file: result.txt
verbose: true
PyPkg Class API Reference¶
The PyPkg class enables defining entire packages in Python, providing
programmatic control over package structure and task registration.
PyPkg Base Class¶
Class Attributes¶
name (str): Package name (optional, defaults to class name)
Methods¶
registerTask(cls, T): Register a task class with this package
Defining a PyPkg¶
from dv_flow.mgr import PyPkg, pypkg, PyTask
import dataclasses as dc
@dc.dataclass
class MyToolPkg(PyPkg):
name = "mytool"
@dc.dataclass
class Params:
version: str = "1.0"
debug: bool = False
Registering Tasks¶
Use the @pypkg decorator to register tasks:
@pypkg(MyToolPkg)
@dc.dataclass
class Compile(PyTask):
@dc.dataclass
class Params:
sources: list = dc.field(default_factory=list)
async def __call__(self):
# Implementation
pass
Tasks are automatically registered with the package and become available
as mytool.Compile in flow definitions.
Additional API Details¶
TaskRunCtxt Extended API¶
The TaskRunCtxt class provides additional methods for task implementations:
- mkDataItem(type, **kwargs)
Create a data item of the specified type. Returns a configured data item instance that can be added to task outputs.
fileset = ctxt.mkDataItem("std.FileSet") fileset.filetype = "verilogSource" fileset.files = ["file1.v", "file2.v"]
- exec(cmd, logfile=None, logfilter=None, cwd=None, env=None)
Execute a command and capture output. Returns the command’s exit status.
status = await ctxt.exec( ["gcc", "-o", "output", "input.c"], logfile="compile.log", cwd="/tmp/build" )
- exec_parallel(cmds, logfilters=None)
Execute multiple commands in parallel, subject to nproc limits. Each command runs independently, respecting the exec_semaphore concurrency limits. Returns a list of integer status codes corresponding to each command in the input array.
The
cmdsparameter takes a list ofExecCmdobjects, which describe each command to execute:from dv_flow.mgr import ExecCmd cmds = [ ExecCmd(cmd=['gcc', '-c', 'file1.c'], logfile='compile1.log'), ExecCmd(cmd=['gcc', '-c', 'file2.c'], logfile='compile2.log'), ExecCmd(cmd=['gcc', '-c', 'file3.c'], logfile='compile3.log'), ] statuses = await ctxt.exec_parallel(cmds) # statuses[0] = exit code for file1.c compile # statuses[1] = exit code for file2.c compile # statuses[2] = exit code for file3.c compile
The
ExecCmdclass has the following attributes:cmd: List of command arguments (e.g.,
['ls', '-la'])logfile: Optional log file name for command output
cwd: Optional working directory (defaults to task rundir)
env: Optional environment variables dict (defaults to task env)
- error(msg), warning(msg), info(msg)
Add markers to the task output for user notification.
ctxt.error("Compilation failed") ctxt.warning("Deprecated option used") ctxt.info("Processing 100 files")
- Properties:
rundir: Task’s run directory path
root_pkgdir: Root package directory
root_rundir: Root run directory
env: Environment variables (dict)
TaskGenCtxt Extended API¶
The TaskGenCtxt class provides methods for programmatic task graph generation:
- mkTaskNode(type_t, name=None, srcdir=None, needs=None, **kwargs)
Create a new task node in the graph.
node = ctxt.mkTaskNode( "std.Message", name="hello", msg="Hello, World!" )
- mkName(name)
Generate a qualified task name relative to the containing compound task.
# Inside compound task "parent" name = ctxt.mkName("child") # Returns "parent.child"
- addTask(task)
Add a task node to the generated graph.
for i in range(5): task = ctxt.mkTaskNode("std.Message", msg=f"Task {i}") ctxt.addTask(task)
- Properties:
rundir: Task’s run directory path
srcdir: Source directory path
basename: Base name for generated tasks
input: Input task node
builder: Task graph builder instance