Python Task API

The core implementation for tasks is provided by a Python async method. This method is passed two parameters:

  • runner - Services that the task runner provides for the use of tasks

  • input - The input data for the task

The method must return a TaskDataResult object with the execution status of the task, result data, markers, and memento data.

TaskDataInput

An object of type TaskDataInput is passed as the input parameter of the runner method.

class dv_flow.mgr.TaskDataInput(*, name: str, changed: bool, srcdir: str, rundir: str, params: Any, inputs: List[Any], memento: Any)

Input data to a task:

  • name - name of the task

  • changed - indicates whether any of this task’s dependencies have changed

  • rundir - directory in which the task is to be run

  • params - parameters to the task

  • inputs - list of TaskDataItem that are consumed’ by this task

  • memento - memento data previously returned by this task. None if no memento is available

TaskDataItem

Data is passed between tasks via TaskDataItem-derived objects. Each task may produce 0 or more TaskDataItem objects as output. A task receives all the TaskDataItem objects produced by its dependencies.

class dv_flow.mgr.TaskDataItem(*, type: str, src: str = None, seq: int = -1)

Base class for task data items

  • type - Name of the data item type

  • src - Name of the task that produced this item

  • seq - Sequence number of the item within the task

TaskRunCtxt

The task implementaion is passed a task-run context object that provides utilities for the task.

class dv_flow.mgr.TaskRunCtxt(runner: 'TaskRunner', ctxt: dv_flow.mgr.task_node_ctxt.TaskNodeCtxt, rundir: str, _markers: List[dv_flow.mgr.task_data.TaskMarker] = <factory>, _exec_info: List[dv_flow.mgr.task_run_ctxt.ExecInfo] = <factory>)
create(path, content)

Create a file in the task’s rundir

error(msg: str, loc: TaskMarkerLoc = None)

Add an error marker related to the task’s execution

async exec(cmd: List[str], logfile=None, logfilter=None, cwd=None, env=None)

Executes a command as part of the task’s implementation. Output from the command will be saved to the specified logfile, or to a default logfile if not specified. If the command fails, an error marker will be added.

Example:

status |= await runner.exec(['ls', '-l'], logfile='ls.log')
info(msg: str, loc: TaskMarkerLoc = None)

Add an error marker related to the task’s execution

marker(msg: str, severity: SeverityE, loc: TaskMarkerLoc = None)

Add a marker related to the task’s execution

mkDataItem(type, **kwargs)

Create a data item in the task’s rundir. The data item will be created in the task’s rundir, and will be available to the task’s implementation.

TaskDataResult

Task implementation methods must return an object of type TaskDataResult. This object contains key data about task execution.

class dv_flow.mgr.TaskDataResult(*, changed: bool = True, output: ~typing.List[~typing.Any] = <factory>, memento: ~typing.Any = None, markers: ~typing.List[~dv_flow.mgr.task_data.TaskMarker] = <factory>, status: int = 0)

Result data from a task:

  • changed - indicates whether the task modified its result data

  • output - list of output parameter sets

  • memento - memento data to be passed to the next invocation of the task

  • markers - list of markers produced by the task

  • status - status code (0=success, non-zero=failure)

TaskMarker

Tasks may produce markers to highlight key information to the user. A marker is typically a pointer to a file location with an associated severity level (error, warning, info).

class dv_flow.mgr.TaskMarker(*, msg: str, severity: SeverityE, loc: TaskMarkerLoc = None)

Captures marker data produced by a task.

class dv_flow.mgr.TaskMarkerLoc(*, path: str, line: int = -1, pos: int = -1)

Captures the source location of a marker