Structured Concurrency¶
In this section, I’ll add a few notes about structured concurrency APIs. However, I won’t go into what structured concurrency is or why it matters, since there is already an amazing article explaining it.
List of structured concurrency APIs¶
move_on_when()
(alias ofwait_any_cm
)
Ideal¶
Ideally, a program should have a single root task, with all other tasks as its children or as descendants of other tasks, forming a single task tree.
This is something that trio
enforces, but asyncgui
is unable to do due to its architectural limitations [1].
In asyncgui
. every asyncgui.Task
instance returned by asyncgui.start()
is a root task.
(editing…)
Nest as you like¶
Once you start using structured concurrency APIs, you’ll notice how powerful they are for expressing high-level control flow.
For example, if you want to wait until async_fn1
completes and either async_fn2
or async_fn3
completes,
you can implement it like this:
tasks = await wait_all(
async_fn1(),
wait_any(
async_fn2(),
async_fn3(),
),
)

The downside of this approach is that it becomes cumbersome to access tasks deeply nested in the hierarchy.
flattened_tasks = (tasks[0], *tasks[1].result, )
for idx, task in enumerate(flattened_tasks, start=1):
if task.finished:
print(f"async_fn{idx} completed with a return value of {task.result}.")
else:
print(f"async_fn{idx} was cancelled.")
The deeper a task is nested, the longer the expression needed to access it becomes — like tasks[i].result[j].result[k]
.
If you don’t like writing such lengthy expressions, you can avoid it by creating a asyncgui.Task
instance yourself
and passing it to the API, like so:
await wait_all(
async_fn1(),
wait_any(
task2 := Task(async_fn2()),
async_fn3(),
),
)
if task2.finished:
print(f"async_fn2 completed with a return value of {task2.result}.")
else:
print("async_fn2 was cancelled.")
Exception Handling¶
All the APIs propagate exceptions in the same way as trio with the strict_exception_groups
parameter being True.
In other words, they always wrap the exception(s) occurred in their child tasks in an ExceptionGroup
.
try:
await wait_any(...)
except* Exception as excgroup:
for exc in excgroup.exceptions:
print('Exception caught:', type(exc))
import exceptiongroup
def error_handler(excgroup):
for exc in excgroup.exceptions:
print('Exception caught:', type(exc))
with exceptiongroup.catch({Exception: error_handler, }):
await wait_any(...)