Python Task API¶
DV Flow Manager provides two task-oriented API extension mechanisms that allow leaf tasks to be implemented as a Python task and allow the body of a compound task to be generated.
Leaf-Task Implementation API¶
The core implementation for tasks is provided by a Python async method. This method is passed two parameters:
runner - Services that the task runner provides for the use of tasks
input - The input data for the task
The method must return a TaskDataResult object with the execution status of the task, result data, markers, and memento data.
TaskDataInput¶
An object of type TaskDataInput is passed as the input parameter of the runner method.
- class dv_flow.mgr.TaskDataInput(*, name: str, changed: bool, srcdir: str, rundir: str, params: Any, inputs: List[Any], memento: Any)¶
Input data to a task:
name - name of the task
changed - indicates whether any of this task’s dependencies have changed
rundir - directory in which the task is to be run
params - parameters to the task
inputs - list of TaskDataItem that are consumed’ by this task
memento - memento data previously returned by this task. None if no memento is available
TaskDataItem¶
Data is passed between tasks via TaskDataItem-derived objects. Each task may produce 0 or more TaskDataItem objects as output. A task receives all the TaskDataItem objects produced by its dependencies.
- class dv_flow.mgr.TaskDataItem(*, type: str, src: str = None, seq: int = -1)¶
Base class for task data items
type - Name of the data item type
src - Name of the task that produced this item
seq - Sequence number of the item within the task
TaskRunCtxt¶
The task implementaion is passed a task-run context object that provides utilities for the task.
- class dv_flow.mgr.TaskRunCtxt(runner: 'TaskRunner', ctxt: dv_flow.mgr.task_node_ctxt.TaskNodeCtxt, rundir: str, _markers: List[dv_flow.mgr.task_data.TaskMarker] = <factory>, _exec_info: List[dv_flow.mgr.task_run_ctxt.ExecInfo] = <factory>)¶
- create(path, content)¶
Create a file in the task’s rundir
- error(msg: str, loc: TaskMarkerLoc = None)¶
Add an error marker related to the task’s execution
- async exec(cmd: List[str], logfile=None, logfilter=None, cwd=None, env=None)¶
Executes a command as part of the task’s implementation. Output from the command will be saved to the specified logfile, or to a default logfile if not specified. If the command fails, an error marker will be added.
Example:
status |= await runner.exec(['ls', '-l'], logfile='ls.log')
- info(msg: str, loc: TaskMarkerLoc = None)¶
Add an error marker related to the task’s execution
- marker(msg: str, severity: SeverityE, loc: TaskMarkerLoc = None)¶
Add a marker related to the task’s execution
- mkDataItem(type, **kwargs)¶
Create a data item in the task’s rundir. The data item will be created in the task’s rundir, and will be available to the task’s implementation.
TaskDataResult¶
Task implementation methods must return an object of type TaskDataResult. This object contains key data about task execution.
- class dv_flow.mgr.TaskDataResult(*, changed: bool = True, output: ~typing.List[~typing.Any] = <factory>, memento: ~typing.Any = None, markers: ~typing.List[~dv_flow.mgr.task_data.TaskMarker] = <factory>, status: int = 0)¶
Result data from a task:
changed - indicates whether the task modified its result data
output - list of output parameter sets
memento - memento data to be passed to the next invocation of the task
markers - list of markers produced by the task
status - status code (0=success, non-zero=failure)
TaskMarker¶
Tasks may produce markers to highlight key information to the user. A marker is typically a pointer to a file location with an associated severity level (error, warning, info).
- class dv_flow.mgr.TaskMarker(*, msg: str, severity: SeverityE, loc: TaskMarkerLoc = None)¶
Captures marker data produced by a task.
- class dv_flow.mgr.TaskMarkerLoc(*, path: str, line: int = -1, pos: int = -1)¶
Captures the source location of a marker
Task-Graph Generation API¶
DV flow manager supports the generate strategy by calling a Python task that is responsible for building the body of the compound task.
A graph-building method has the following signature:
def build_graph(ctxt : TaskGenCtxt, input : TaskGenInputData):
pass
ctxt - Provides services for building and registering tasks
input - Input to the generator. Currently, the task parameters
Assuming that this method is defined in a module named my_module, the following YAML specifies that the task will be called to generate the body of the compound task:
tasks:
- name: mytask
strategy:
generate:
run: my_module.build_graph
TaskGenCtxt API¶
- class dv_flow.mgr.TaskGenCtxt(rundir: str, srcdir: str, input: dv_flow.mgr.task_node.TaskNode, basename: str, builder: 'TaskGraphBuilder', body: List[ForwardRef('Task')] = <factory>, tasks: List[ForwardRef('TaskNode')] = <factory>, _idx: int = 1)¶
- addTask(task: TaskNode) TaskNode ¶
Add the specified task to the graph
- getInputs() List[object] ¶
Gets the input datasets of the containing task
- mkName(name: str) str ¶
Create a taskname relative to containing compound task
- mkTaskNode(type_t, name=None, srcdir=None, needs=None, **kwargs)¶
Creates a new task node. - type_t - type name of the task - name - (Optional) full name of the task - srcdir - (Optional) - needs - (Optional) task nodes required by this task - kwargs - (Optionl) parameter overrides for the task
TaskGenInputData¶
The TaskGenInputData class provides the value of task parameters specified on the containing compound task.