Error Handling¶
DV Flow Manager supports workflows where task failure is an expected outcome — for example, running a regression suite and collecting results across all tests regardless of individual test pass/fail status. This section describes the mechanisms that let compound tasks tolerate subtask failures, continue executing independent siblings, and aggregate results with custom logic.
The std.TaskFailure Item¶
When a leaf task exits with a non-zero status, the framework automatically
appends a std.TaskFailure data item to that task’s output. The item
carries:
task_name— the fully-qualified name of the task that failedstatus— the non-zero exit codemarkers— any diagnostic markers the task produced (errors, warnings)
std.TaskFailure items propagate through skipped tasks just like any other
data item, so a compound task can inspect failures from anywhere in its
subtask graph — even for subtasks that were skipped because an earlier sibling
already failed.
Downstream tasks that are not specifically written to handle failures can
safely ignore std.TaskFailure items; they are filtered out by the default
compound aggregator before the compound’s output is produced (see
Default Aggregation below).
Note
std.TaskFailure is a framework-emitted item. You do not declare or
produce it yourself; it is added automatically whenever a leaf task returns
a non-zero status code.
Controlling Failure Tolerance with max_failures¶
By default a compound task stops launching new subtasks as soon as any
subtask fails (fail-fast behaviour). The max_failures field changes this:
Value |
Behaviour |
|---|---|
|
No limit. All independent subtasks run regardless of failures. Failures propagate to the overall run status. |
|
Fail-fast (equivalent to the default behaviour without |
|
Stop launching new independent subtasks once N failures have
accumulated. Failures are scoped to the compound — they do not
update the global run status unless the compound’s |
Example — run all tests even when some fail:
tasks:
- name: RunAllTests
max_failures: -1
body:
- name: test1
uses: mytools.RunTest
with: {seed: 1}
- name: test2
uses: mytools.RunTest
with: {seed: 2}
- name: test3
uses: mytools.RunTest
with: {seed: 3}
Example — tolerate up to two failures before stopping:
tasks:
- name: RunTests
max_failures: 2
body:
- name: test1
uses: mytools.RunTest
- name: test2
uses: mytools.RunTest
- name: test3
uses: mytools.RunTest
Custom Result Aggregation with on_error¶
When a compound task finishes, DV Flow Manager calls an aggregation
callable to decide the compound’s final status and output items. By default
the built-in aggregator is used (see Default Aggregation). Supply
on_error to use your own:
tasks:
- name: RunAllTests
max_failures: -1
on_error: myproject.test_utils.aggregate_results
body:
- name: test1
uses: mytools.RunTest
- name: test2
uses: mytools.RunTest
The value of on_error must be a module:function path (using . as
separator) that resolves to an async callable with this signature:
async def aggregate_results(
ctxt: TaskRunCtxt,
input: CompoundRunInput,
) -> TaskDataResult:
...
on_error is called even when no subtask failed, so it also serves as a
general compound post-processor.
Note
on_error and max_failures work independently. You can use either
or both. max_failures controls when siblings stop running;
on_error controls what status and output items the compound produces.
Default Aggregation¶
When on_error is not specified the framework uses a built-in aggregator
that:
OR-accumulates the
statusfields of allstd.TaskFailureitems.Passes all other output items through unchanged.
This means that for ordinary compound tasks that do not specify on_error,
std.TaskFailure items are consumed by the compound and are not
visible to downstream tasks. The compound’s own exit status reflects
whether any subtask failed.
Writing an on_error Handler¶
The handler receives a CompoundRunInput object
containing all the information it needs to produce a result.
from dv_flow.mgr import TaskDataResult, TaskMarker
from dv_flow.mgr.task_data import CompoundRunInput, TaskFailure
from dv_flow.mgr.task_run_ctxt import TaskRunCtxt
async def aggregate_results(
ctxt: TaskRunCtxt,
input: CompoundRunInput,
) -> TaskDataResult:
failures = [i for i in input.inputs
if getattr(i, "type", None) == "std.TaskFailure"]
other = [i for i in input.inputs
if getattr(i, "type", None) != "std.TaskFailure"]
passed = sum(1 for t in input.subtasks if t.status == 0 and not t.skipped)
failed = len(failures)
skipped = sum(1 for t in input.subtasks if t.skipped)
ctxt.info(f"Results: {passed} passed, {failed} failed, {skipped} skipped")
if failures:
ctxt.error(f"{failed} subtask(s) failed")
# Propagate non-failure items; report status.
return TaskDataResult(
status=1 if failures else 0,
output=other,
)
Key points:
Filter
std.TaskFailureitems out ofinput.inputsbefore passing items downstream — or include them deliberately if consumers expect them.input.subtasksprovides per-subtaskstatus,skipped, andnamefields (seeSubtaskSummary).The returned
TaskDataResultstatusbecomes the compound task’s exit status.Use
ctxt.info(),ctxt.warning(), andctxt.error()to attach diagnostic markers to the compound’s output.
Dynamic Subgraphs and max_failures¶
Tasks that build their subgraph dynamically at runtime (via
run_subgraph()) also support max_failures:
async def run_tests(ctxt, input):
tasks = [build_test_node(ctxt, seed) for seed in seeds]
# Run all tests; failures do not abort siblings.
await ctxt.run_subgraph(tasks, max_failures=-1)
return TaskDataResult(status=0, output=[])
The max_failures parameter on run_subgraph behaves identically to the
YAML field: -1 runs all independent tasks while still propagating
failures to the overall run status; N > 0 stops after N failures and
scopes the failures so they do not update the global run status.
See also: Developing Tasks for the full dynamic subgraph API.
Worked Example: Test Regression¶
This example shows a compound task that runs a full test suite, tolerates all failures, and produces a summary report.
package:
name: myproject
tasks:
- name: RunRegression
max_failures: -1
on_error: myproject.regression.summarize
body:
- name: test_smoke
uses: mytools.RunTest
with: {suite: smoke}
- name: test_functional
uses: mytools.RunTest
with: {suite: functional}
- name: test_corner
uses: mytools.RunTest
with: {suite: corner_cases}
# myproject/regression.py
from dv_flow.mgr import TaskDataResult
from dv_flow.mgr.task_data import CompoundRunInput
from dv_flow.mgr.task_run_ctxt import TaskRunCtxt
async def summarize(ctxt: TaskRunCtxt, input: CompoundRunInput) -> TaskDataResult:
failures = [i for i in input.inputs
if getattr(i, "type", None) == "std.TaskFailure"]
passed = sum(1 for t in input.subtasks
if t.status == 0 and not t.skipped)
failed = len(failures)
skipped = sum(1 for t in input.subtasks if t.skipped)
ctxt.info(f"Regression complete: {passed} passed / "
f"{failed} failed / {skipped} skipped")
for f in failures:
ctxt.error(f"FAILED: {f.task_name} (status={f.status})")
return TaskDataResult(
status=1 if failures else 0,
output=[i for i in input.inputs
if getattr(i, "type", None) != "std.TaskFailure"],
)