Python Task API

DV Flow Manager provides two task-oriented API extension mechanisms that allow leaf tasks to be implemented as a Python task and allow the body of a compound task to be generated.

Leaf-Task Implementation API

The core implementation for tasks is provided by a Python async method. This method is passed two parameters:

  • runner - Services that the task runner provides for the use of tasks

  • input - The input data for the task

The method must return a TaskDataResult object with the execution status of the task, result data, markers, and memento data.

TaskDataInput

An object of type TaskDataInput is passed as the input parameter of the runner method.

class dv_flow.mgr.TaskDataInput(*, name: str, changed: bool, srcdir: str, rundir: str, params: Any, inputs: List[Any], memento: Any)

Input data to a task:

  • name - name of the task

  • changed - indicates whether any of this task’s dependencies have changed

  • rundir - directory in which the task is to be run

  • params - parameters to the task

  • inputs - list of TaskDataItem that are consumed’ by this task

  • memento - memento data previously returned by this task. None if no memento is available

TaskDataItem

Data is passed between tasks via TaskDataItem-derived objects. Each task may produce 0 or more TaskDataItem objects as output. A task receives all the TaskDataItem objects produced by its dependencies.

class dv_flow.mgr.TaskDataItem(*, type: str, src: str = None, seq: int = -1)

Base class for task data items

  • type - Name of the data item type

  • src - Name of the task that produced this item

  • seq - Sequence number of the item within the task

TaskRunCtxt

The task implementaion is passed a task-run context object that provides utilities for the task.

class dv_flow.mgr.TaskRunCtxt(runner: 'TaskRunner', ctxt: dv_flow.mgr.task_node_ctxt.TaskNodeCtxt, rundir: str, _markers: List[dv_flow.mgr.task_data.TaskMarker] = <factory>, _exec_info: List[dv_flow.mgr.task_run_ctxt.ExecInfo] = <factory>, _log: logging.Logger = <factory>, _exec_start_callback: <built-in function callable> = None, _exec_end_callback: <built-in function callable> = None)
create(path, content)

Create a file in the task’s rundir

error(msg: str, loc: TaskMarkerLoc = None)

Add an error marker related to the task’s execution

async exec(cmd: List[str], logfile=None, logfilter=None, cwd=None, env=None)

Executes a command as part of the task’s implementation. Output from the command will be saved to the specified logfile, or to a default logfile if not specified. If the command fails, an error marker will be added.

Example:

status |= await runner.exec(['ls', '-l'], logfile='ls.log')
async exec_parallel(cmds: List[ExecCmd], logfilters: Dict[int, callable] = None) List[int]

Execute multiple commands in parallel, subject to nproc limits.

Each command runs independently, respecting the exec_semaphore concurrency limits. Returns an array of status codes corresponding to each command in the input array.

Args:

cmds: List of ExecCmd objects describing commands to run logfilters: Optional dict mapping command index to logfilter callable

Returns:

List of integer status codes, one per command in same order as input

Example:

from dv_flow.mgr.task_run_ctxt import ExecCmd

cmds = [
    ExecCmd(cmd=['gcc', '-c', 'file1.c'], logfile='compile1.log'),
    ExecCmd(cmd=['gcc', '-c', 'file2.c'], logfile='compile2.log'),
    ExecCmd(cmd=['gcc', '-c', 'file3.c'], logfile='compile3.log'),
]
statuses = await runner.exec_parallel(cmds)
# statuses[0] = exit code for file1.c compile
# statuses[1] = exit code for file2.c compile
# statuses[2] = exit code for file3.c compile
info(msg: str, loc: TaskMarkerLoc = None)

Add an error marker related to the task’s execution

marker(msg: str, severity: SeverityE, loc: TaskMarkerLoc = None)

Add a marker related to the task’s execution

mkDataItem(type, **kwargs)

Create a data item in the task’s rundir. The data item will be created in the task’s rundir, and will be available to the task’s implementation.

async run_subgraph(tasks: TaskNode | List[TaskNode], name: str = None, timeout: float = None) TaskDataOutput | List[TaskDataOutput]

Execute a sub-graph of tasks dynamically during task execution.

This schedules the tasks into the current execution schedule, respecting dependencies and resource constraints. The call blocks until all tasks complete.

Args:

tasks: TaskNode or list of TaskNodes to execute name: Optional name prefix for sub-tasks (for debugging) timeout: Optional timeout in seconds (None = no timeout)

Returns:

TaskDataOutput or list of TaskDataOutput from terminal tasks

Raises:

asyncio.TimeoutError: If timeout is exceeded Exception: If sub-graph execution fails

Example:

async def run(ctxt, input):
    # Create dynamic tasks
    task1 = ctxt.mkTaskNode("type1", name="task1")
    task2 = ctxt.mkTaskNode("type2", name="task2", needs=[task1])

    # Execute dynamically with timeout
    results = await ctxt.run_subgraph([task1, task2], timeout=30.0)

    return TaskDataResult(status=0, output=results)

TaskDataResult

Task implementation methods must return an object of type TaskDataResult. This object contains key data about task execution.

class dv_flow.mgr.TaskDataResult(*, changed: bool = True, output: ~typing.List[~typing.Any] = <factory>, memento: ~typing.Any = None, markers: ~typing.List[~dv_flow.mgr.task_data.TaskMarker] = <factory>, status: int = 0, cache_hit: bool = False, cache_stored: bool = False)

Result data from a task:

  • changed - indicates whether the task modified its result data

  • output - list of output parameter sets

  • memento - memento data to be passed to the next invocation of the task

  • markers - list of markers produced by the task

  • status - status code (0=success, non-zero=failure)

  • cache_hit - indicates whether the task result came from cache

  • cache_stored - indicates whether the task result was stored in cache

TaskMarker

Tasks may produce markers to highlight key information to the user. A marker is typically a pointer to a file location with an associated severity level (error, warning, info).

class dv_flow.mgr.TaskMarker(*, msg: str, severity: SeverityE, loc: TaskMarkerLoc = None)

Captures marker data produced by a task.

class dv_flow.mgr.TaskMarkerLoc(*, path: str, line: int = -1, pos: int = -1)

Captures the source location of a marker

Task-Graph Generation API

DV flow manager supports the generate strategy by calling a Python task that is responsible for building the body of the compound task.

A graph-building method has the following signature:

def build_graph(ctxt : TaskGenCtxt, input : TaskGenInputData):
    pass
  • ctxt - Provides services for building and registering tasks

  • input - Input to the generator. Currently, the task parameters

Assuming that this method is defined in a module named my_module, the following YAML specifies that the task will be called to generate the body of the compound task:

tasks:
- name: mytask
  strategy:
    generate:
      run: my_module.build_graph

TaskGenCtxt API

class dv_flow.mgr.TaskGenCtxt(rundir: str, srcdir: str, input: dv_flow.mgr.task_node.TaskNode, basename: str, builder: 'TaskGraphBuilder', body: List[ForwardRef('Task')] = <factory>, tasks: List[ForwardRef('TaskNode')] = <factory>, _idx: int = 1)
addTask(task: TaskNode) TaskNode

Add the specified task to the graph

getInputs() List[object]

Gets the input datasets of the containing task

mkName(name: str) str

Create a taskname relative to containing compound task

mkTaskNode(type_t, name=None, srcdir=None, needs=None, **kwargs)

Creates a new task node. - type_t - type name of the task - name - (Optional) full name of the task - srcdir - (Optional) - needs - (Optional) task nodes required by this task - kwargs - (Optionl) parameter overrides for the task

TaskGenInputData

The TaskGenInputData class provides the value of task parameters specified on the containing compound task.

Custom Up-to-Date Check API

Tasks can define custom up-to-date check methods to determine whether a task needs to be re-executed. This is useful for tasks that reference files not explicitly listed in a fileset.

Implementing a Custom Check

A custom up-to-date method is an async Python function with the following signature:

async def check_uptodate(ctxt: UpToDateCtxt) -> bool:
    """
    Check if task is up-to-date.

    Returns True if the task is up-to-date and should be skipped,
    False if the task needs to run.
    """
    pass

The method is specified in the task definition YAML:

tasks:
- name: my_task
  uptodate: mymodule.check_uptodate
  run: mymodule.run_task

UpToDateCtxt

The context object passed to custom up-to-date methods provides access to the task’s run directory, parameters, inputs, and previous execution data.

class dv_flow.mgr.uptodate_ctxt.UpToDateCtxt(rundir: str, srcdir: str, params: Any, inputs: List[Any], exec_data: dict, memento: Any | None = None)

Context passed to custom up-to-date check methods

async exec(cmd: List[str], cwd: str = None, env: Dict[str, str] = None) int

Run a subprocess for dependency checking. Returns the exit status code.

Example: Checking File Timestamps

Here’s an example of a custom up-to-date check that verifies whether an external dependency file has been modified:

import os
from dv_flow.mgr.uptodate_ctxt import UpToDateCtxt

async def check_external_deps(ctxt: UpToDateCtxt) -> bool:
    """Check if external dependency files are unchanged."""

    # Get the recorded timestamp from previous execution
    prev_mtime = ctxt.exec_data.get('dep_mtime')
    if prev_mtime is None:
        return False  # No previous data, must run

    # Check current timestamp
    dep_file = os.path.join(ctxt.rundir, "..", "external_deps.txt")
    if not os.path.exists(dep_file):
        return False  # Dependency missing, must run

    current_mtime = os.path.getmtime(dep_file)

    # Up-to-date if timestamp hasn't changed
    return current_mtime == prev_mtime

Example: Running a Subprocess Check

The UpToDateCtxt.exec() method allows running subprocesses for dependency checking:

from dv_flow.mgr.uptodate_ctxt import UpToDateCtxt

async def check_git_status(ctxt: UpToDateCtxt) -> bool:
    """Check if git working tree is clean."""

    # Returns 0 if no changes, non-zero otherwise
    status = await ctxt.exec(["git", "diff", "--quiet", "HEAD"])

    return status == 0  # Up-to-date if no changes

PyTask Class API Reference

The PyTask class provides a class-based interface for implementing tasks in Python. This approach offers better organization, type safety, and reusability compared to function-based tasks.

PyTask Base Class

class dv_flow.mgr.PyTask(desc: str = '', doc: str = '', shell: str = 'pytask', _ctxt: dv_flow.mgr.task_run_ctxt.TaskRunCtxt | None = None, _input: dv_flow.mgr.task_data.TaskDataInput[PyTask.Params] | None = None)
class Params

Class Attributes

  • desc (str): Short description of the task

  • doc (str): Full documentation for the task

  • shell (str): Shell to use for execution (default: “pytask”)

Instance Attributes

  • _ctxt (TaskRunCtxt): Task execution context (set by runtime)

  • _input (TaskDataInput): Task input data (set by runtime)

Properties

  • params: Typed access to task parameters (returns instance of nested Params class)

Defining a PyTask

from dv_flow.mgr import PyTask
import dataclasses as dc

@dc.dataclass
class MyTask(PyTask):
    desc = "Brief description"
    doc = "Detailed documentation..."

    @dc.dataclass
    class Params:
        input_file: str = ""
        output_file: str = ""
        verbose: bool = False

    async def __call__(self) -> Union[str, None]:
        # Access parameters
        print(f"Processing {self.params.input_file}")

        # Access context
        rundir = self._ctxt.rundir

        # Execute commands
        status = await self._ctxt.exec(
            ["process", self.params.input_file],
            logfile="process.log"
        )

        # Return None for direct execution
        # Or return a command string to execute
        return None

Using PyTask

Reference the PyTask class in YAML:

tasks:
- name: my_task
  shell: pytask
  run: my_module.MyTask
  with:
    input_file: data.txt
    output_file: result.txt
    verbose: true

PyPkg Class API Reference

The PyPkg class enables defining entire packages in Python, providing programmatic control over package structure and task registration.

PyPkg Base Class

class dv_flow.mgr.PyPkg(_tasks: Dict = <factory>)
class Params

Class Attributes

  • name (str): Package name (optional, defaults to class name)

Methods

  • registerTask(cls, T): Register a task class with this package

Defining a PyPkg

from dv_flow.mgr import PyPkg, pypkg, PyTask
import dataclasses as dc

@dc.dataclass
class MyToolPkg(PyPkg):
    name = "mytool"

    @dc.dataclass
    class Params:
        version: str = "1.0"
        debug: bool = False

Registering Tasks

Use the @pypkg decorator to register tasks:

@pypkg(MyToolPkg)
@dc.dataclass
class Compile(PyTask):
    @dc.dataclass
    class Params:
        sources: list = dc.field(default_factory=list)

    async def __call__(self):
        # Implementation
        pass

Tasks are automatically registered with the package and become available as mytool.Compile in flow definitions.

Additional API Details

TaskRunCtxt Extended API

The TaskRunCtxt class provides additional methods for task implementations:

mkDataItem(type, **kwargs)

Create a data item of the specified type. Returns a configured data item instance that can be added to task outputs.

fileset = ctxt.mkDataItem("std.FileSet")
fileset.filetype = "verilogSource"
fileset.files = ["file1.v", "file2.v"]
exec(cmd, logfile=None, logfilter=None, cwd=None, env=None)

Execute a command and capture output. Returns the command’s exit status.

status = await ctxt.exec(
    ["gcc", "-o", "output", "input.c"],
    logfile="compile.log",
    cwd="/tmp/build"
)
exec_parallel(cmds, logfilters=None)

Execute multiple commands in parallel, subject to nproc limits. Each command runs independently, respecting the exec_semaphore concurrency limits. Returns a list of integer status codes corresponding to each command in the input array.

The cmds parameter takes a list of ExecCmd objects, which describe each command to execute:

from dv_flow.mgr import ExecCmd

cmds = [
    ExecCmd(cmd=['gcc', '-c', 'file1.c'], logfile='compile1.log'),
    ExecCmd(cmd=['gcc', '-c', 'file2.c'], logfile='compile2.log'),
    ExecCmd(cmd=['gcc', '-c', 'file3.c'], logfile='compile3.log'),
]
statuses = await ctxt.exec_parallel(cmds)
# statuses[0] = exit code for file1.c compile
# statuses[1] = exit code for file2.c compile
# statuses[2] = exit code for file3.c compile

The ExecCmd class has the following attributes:

  • cmd: List of command arguments (e.g., ['ls', '-la'])

  • logfile: Optional log file name for command output

  • cwd: Optional working directory (defaults to task rundir)

  • env: Optional environment variables dict (defaults to task env)

error(msg), warning(msg), info(msg)

Add markers to the task output for user notification.

ctxt.error("Compilation failed")
ctxt.warning("Deprecated option used")
ctxt.info("Processing 100 files")
Properties:
  • rundir: Task’s run directory path

  • root_pkgdir: Root package directory

  • root_rundir: Root run directory

  • env: Environment variables (dict)

TaskGenCtxt Extended API

The TaskGenCtxt class provides methods for programmatic task graph generation:

mkTaskNode(type_t, name=None, srcdir=None, needs=None, **kwargs)

Create a new task node in the graph.

node = ctxt.mkTaskNode(
    "std.Message",
    name="hello",
    msg="Hello, World!"
)
mkName(name)

Generate a qualified task name relative to the containing compound task.

# Inside compound task "parent"
name = ctxt.mkName("child")  # Returns "parent.child"
addTask(task)

Add a task node to the generated graph.

for i in range(5):
    task = ctxt.mkTaskNode("std.Message", msg=f"Task {i}")
    ctxt.addTask(task)
Properties:
  • rundir: Task’s run directory path

  • srcdir: Source directory path

  • basename: Base name for generated tasks

  • input: Input task node

  • builder: Task graph builder instance