Python Task API¶
The core implementation for tasks is provided by a Python async method. This method is passed two parameters:
runner - Services that the task runner provides for the use of tasks
input - The input data for the task
The method must return a TaskDataResult object with the execution status of the task, result data, markers, and memento data.
TaskDataInput¶
An object of type TaskDataInput is passed as the input parameter of the runner method.
- class dv_flow.mgr.TaskDataInput(*, name: str, changed: bool, srcdir: str, rundir: str, params: Any, inputs: List[Any], memento: Any)¶
Input data to a task:
name - name of the task
changed - indicates whether any of this task’s dependencies have changed
rundir - directory in which the task is to be run
params - parameters to the task
inputs - list of TaskDataItem that are consumed’ by this task
memento - memento data previously returned by this task. None if no memento is available
TaskDataItem¶
Data is passed between tasks via TaskDataItem-derived objects. Each task may produce 0 or more TaskDataItem objects as output. A task receives all the TaskDataItem objects produced by its dependencies.
- class dv_flow.mgr.TaskDataItem(*, type: str, src: str = None, seq: int = -1)¶
Base class for task data items
type - Name of the data item type
src - Name of the task that produced this item
seq - Sequence number of the item within the task
TaskRunCtxt¶
The task implementaion is passed a task-run context object that provides utilities for the task.
- class dv_flow.mgr.TaskRunCtxt(runner: 'TaskRunner', ctxt: dv_flow.mgr.task_node_ctxt.TaskNodeCtxt, rundir: str, _markers: List[dv_flow.mgr.task_data.TaskMarker] = <factory>, _exec_info: List[dv_flow.mgr.task_run_ctxt.ExecInfo] = <factory>)¶
- create(path, content)¶
Create a file in the task’s rundir
- error(msg: str, loc: TaskMarkerLoc = None)¶
Add an error marker related to the task’s execution
- async exec(cmd: List[str], logfile=None, logfilter=None, cwd=None, env=None)¶
Executes a command as part of the task’s implementation. Output from the command will be saved to the specified logfile, or to a default logfile if not specified. If the command fails, an error marker will be added.
Example:
status |= await runner.exec(['ls', '-l'], logfile='ls.log')
- info(msg: str, loc: TaskMarkerLoc = None)¶
Add an error marker related to the task’s execution
- marker(msg: str, severity: SeverityE, loc: TaskMarkerLoc = None)¶
Add a marker related to the task’s execution
- mkDataItem(type, **kwargs)¶
Create a data item in the task’s rundir. The data item will be created in the task’s rundir, and will be available to the task’s implementation.
TaskDataResult¶
Task implementation methods must return an object of type TaskDataResult. This object contains key data about task execution.
- class dv_flow.mgr.TaskDataResult(*, changed: bool = True, output: ~typing.List[~typing.Any] = <factory>, memento: ~typing.Any = None, markers: ~typing.List[~dv_flow.mgr.task_data.TaskMarker] = <factory>, status: int = 0)¶
Result data from a task:
changed - indicates whether the task modified its result data
output - list of output parameter sets
memento - memento data to be passed to the next invocation of the task
markers - list of markers produced by the task
status - status code (0=success, non-zero=failure)
TaskMarker¶
Tasks may produce markers to highlight key information to the user. A marker is typically a pointer to a file location with an associated severity level (error, warning, info).
- class dv_flow.mgr.TaskMarker(*, msg: str, severity: SeverityE, loc: TaskMarkerLoc = None)¶
Captures marker data produced by a task.
- class dv_flow.mgr.TaskMarkerLoc(*, path: str, line: int = -1, pos: int = -1)¶
Captures the source location of a marker