Structured Concurrency 🇯🇵

ここでは structured concurrency (構造化された並行性) 関連のAPIについて解説していきます。 構造化された並行性 が何であるか また何故素晴らしいのかはここでは触れませんので 興味がある人は Notes on structured concurrency (不完全な和訳) を読んでください。

wait_any

複数のタスクを同時に走らせ そのいずれかが完了するか 或いは 全てが中断される まで待ちます。 またいずれかのタスクが完了しだい残りのタスクは中断されます。

tasks = await wait_any(async_fn0(), async_fn1(), async_fn2())

どのタスクが完了/中断したのか、完了した物の戻り値は何か等は全て戻り値から判別可能です。

for idx, task in enumerate(tasks):
    if task.finished:
        print(f"async_fn{idx}{task.result} を返して完了しました。")
    else:
        print(f"async_fn{idx} は中断されました。")

wait_all

全てのタスクが完了あるいは中断されるまで待ちます。

tasks = await wait_all(async_fn1(), async_fn2(), async_fn3())

tasks の扱い方は wait_any と同じです。

好きなだけ入れ子に

wait_allwait_any は共に Awaitable を返すのでそれを wait_allwait_any の引数に渡すことも可能です。

# async_fn1が完了し なおかつ async_fn2かasync_fn3のいずれかが完了するまで待つ
tasks = await wait_all(
    async_fn1(),
    wait_any(
        async_fn2(),
        async_fn3(),
    ),
)
_images/nested-tasks.png

ただ階層深くにあるタスクを触るまでが面倒くさくなるのが難点です。

flattened_tasks = (tasks[0], *tasks[1].result, )

for idx, task in enumerate(flattened_tasks, start=1):
    if task.finished:
        print(f"async_fn{idx}{task.result} を返して完了しました。")
    else:
        print(f"async_fn{idx} は中断されました。")

入れ子が深くなればなるほど階層深くにあるタスクを触るための式が tasks[i].result[j].result[k] といった具合に長くなる わけですが、このような書き方を好まないのであれば以下の様に asyncgui.Task のインスタンスを自ら作って渡せば後は階層には触らずに タスクの結果を知ることも出来ます。

await wait_all(
    async_fn1(),
    wait_any(
        task2 := Task(async_fn2()),  # 自ら作って渡す
        async_fn3(),
    ),
)
if tasks2.finished:
    print(f"async_fn2 は {tasks2.result} を返して完了しました")
else:
    print("async_fn2 は中断されました")

wait_any_cm, wait_all_cm

wait_any を用いて二つのタスクを並行させるコードは

async def async_fn1():
    # async_fn1 の中身

async def main():
    await wait_any(async_fn1(), async_fn2())

wait_any_cm を用いて以下の様に書くこともできます。

async def main():
    async with wait_any_cm(async_fn2()) as task2:
        # async_fn1 の中身

この様に async_fn1 の中身をwithブロック内に移す事で関数を一つ減らす事に成功しました。 この機能は async_fn1() 内で main() 内のローカル変数をたくさん読み書きしたい時に特に活きるでしょう。 例えば次のコードを見て下さい。

async def main():
    var1 = ...
    var2 = ...

    async def async_fn1():
        nonlocal var1, var2
        var1 = ...
        var2 = ...

    await wait_any(async_fn1(), async_fn2())

async_fn1() 内で main() 内のローカル変数を触りたいが為にこのようにインナー関数として実装したわけですが、 このようなコードは読みにくいだけでなく nonlocal の書き忘れによるバグを引き起こす可能性も孕んでいます。 これを wait_any_cm を用いて書き直すとどうなるかというと

async def main():
    var1 = ...
    var2 = ...
    async with wait_any_cm(async_fn2()) as task2:
        var1 = ...
        var2 = ...

この様にスッキリします。 後このコンテキストマネージャー型のAPIは Awaitable を一つしか受け取れないので並行させられるタスクの数に限界があるように 見えますが、先に述べたように入れ子にできるのでその限界は実質無いような物です。

async def main():
    async with wait_any_cm(wait_any(...)):
        ...

run_as_daemon

これまで解説してきたAPIはどれも並行させたタスク達の関係が対等でした。 wait_any_cm を例に挙げるならwithブロック内のコードと wait_any_cm に渡したタスクのどちらが完了した場合でももう片方を中断させるの でした。 しかし時には対等ではない関係も必要となります。

async with run_as_daemon(async_fn()) as daemon_task:
    ...

このコードではwithブロック内が先に完了した場合は async_fn() は中断させられますが、 async_fn() が先に完了しても何も起きず withブロックの完了を待つだけです。例えるならデーモンスレッドと非デーモンスレッドの関係です。withブロック内のコードが非デーモンで async_fn() がデーモンになっていると考えて下さい。

run_as_main

これは run_as_daemon の逆でwithブロック内がデーモンとなります。

async with run_as_main(async_fn()) as main_task:
    ...

すなわちwithブロックが先に完了した場合は async_fn() の完了を待つ事になり、 async_fn() が先に完了した場合はwithブロックが中断されます。

open_nursery

trio.open_nursery() を真似たものです。 このAPIの利は並行させたいタスクをあらかじめ用意しなくて良い事です。 nursery が開いている限りは後からいくらでも nursery.start() でタスクを加えられます。

async with open_nursery() as nursery:
    while True:
        touch = await 画面に指が触れられるのを待つ
        nursery.start(指に沿って線を引く(touch))

但しタスクの戻り値を得る方法は無いので別の形で値を受け渡してください。

例外処理

ここのAPI全てに共通しているのが例外の運ばれ方です。 並行させているタスクの内どれか一つで例外が起きると他のタスクは中断され例外は呼び出し元に運ばれます。 この時 中断された他のタスクがその中断過程で更に例外を起こすかもしれないので例外は複数同時に起こりえます。 このため asyncgui は例外を運ぶために Python3.11 より登場した ExceptionGroup を用います (3.11未満のPythonが使われていた場合は exceptiongroup を用います)。 これはたとえ例外が一つしか起こらなかった場合でもです。

try:
    await wait_any(...)
except* Exception as excgroup:
    for exc in excgroup.exceptions:
        print('例外が起きました:', type(exc))