Structured Concurrency 🇯🇵¶
ここでは structured concurrency (構造化された並行性) 関連のAPIについて解説していきます。
構造化された並行性 が何であるか また何故素晴らしいのかはここでは触れませんので
興味がある人は Notes on structured concurrency (不完全な和訳) を読んでください。
wait_any¶
複数のタスクを同時に走らせ そのいずれかが完了するか 或いは 全てが中断される まで待ちます。
そしていずれかのタスクが完了しだい残りのタスクは中断されます。
tasks = await wait_any(async_fn0(), async_fn1(), async_fn2())
どのタスクが完了/中断したのか、完了した物の戻り値は何か等は全て戻り値から判別可能です。
for idx, task in enumerate(tasks):
if task.finished:
print(f"async_fn{idx} は {task.result} を返して完了しました。")
else:
print(f"async_fn{idx} は中断されました。")
wait_all¶
全てのタスクが完了あるいは中断されるまで待ちます。
tasks = await wait_all(async_fn1(), async_fn2(), async_fn3())
tasks の扱い方は wait_any と同じです。
好きなだけ入れ子に¶
wait_all と wait_any は共に Awaitable を返すのでそれを wait_all や wait_any
の引数に渡すことも可能です。
# async_fn1が完了し なおかつ async_fn2かasync_fn3のいずれかが完了するまで待つ
tasks = await wait_all(
async_fn1(),
wait_any(
async_fn2(),
async_fn3(),
),
)
ただ階層深くにあるタスクを触るまでが面倒くさくなるのが難点です。
flattened_tasks = (tasks[0], *tasks[1].result, )
for idx, task in enumerate(flattened_tasks, start=1):
if task.finished:
print(f"async_fn{idx} は {task.result} を返して完了しました。")
else:
print(f"async_fn{idx} は中断されました。")
入れ子が深くなればなるほど階層深くにあるタスクを触るための式が tasks[i].result[j].result[k] といった具合に長くなる
わけですが、このような書き方を好まないのであれば以下の様に asyncgui.Task のインスタンスを自ら作って渡せば後は階層には触らずに
タスクの結果を知ることも出来ます。
await wait_all(
async_fn1(),
wait_any(
task2 := Task(async_fn2()), # 自ら作って渡す
async_fn3(),
),
)
if tasks2.finished:
print(f"async_fn2 は {tasks2.result} を返して完了しました")
else:
print("async_fn2 は中断されました")
move_on_when¶
trio_util.move_on_when() を模した物。
これらは渡されたタスクとwithブロック内のコードを並行して走らせ、片方が完了しだい他方を中断させます。
async with move_on_when(async_fn()) as bg_task:
# hogehoge
run_as_daemon¶
これまで解説してきたAPIはどれも並行させる処理達の関係が対等でした。 しかし時には対等ではない関係も必要となります。
async with run_as_daemon(async_fn()) as daemon_task:
...
このコードではwithブロック内が先に完了した場合は async_fn() は中断させられますが、 async_fn() が先に完了しても何も起きず
withブロックの完了を待つだけです。例えるならデーモンスレッドと非デーモンスレッドの関係です。withブロック内のコードが非デーモンで
async_fn() がデーモンになっていると考えて下さい。
また逆にwithブロック内の方をデーモンにしたい場合は move_on_when を次の様に使えば良いです。
async with move_on_when(async_fn()) as main_task:
...
await sleep_forever()
このようにwithブロック内の最後に await sleep_forever() を入れる事でwithブロック内のコードが自身では終わらなくなるため、
async_fn() の完了をもってコード全体が完了する事になります。
open_nursery¶
trio.open_nursery() を真似たものです。
このAPIの利は並行させたいタスクをあらかじめ用意しなくて良い事です。
nursery が開いている限りは後からいくらでも nursery.start() でタスクを加えられます。
async with open_nursery() as nursery:
while True:
touch = await 画面に指が触れられるのを待つ
task = nursery.start(指に沿って線を引く(touch))
See also
また open_nursery() はこれまで出てきたAPI全てを実装できるほどに強力だったりもします。
from contextlib import asynccontextmanager
from asyncgui import open_nursery
async def wait_any(*args):
async with open_nursery() as nursery:
tasks = [nursery.start(arg, close_on_finish=True) for arg in args]
return tasks
async def wait_all(*args):
async with open_nursery() as nursery:
tasks = [nursery.start(arg) for arg in args]
return tasks
@asynccontextmanager
async def move_on_when(arg):
async with open_nursery() as nursery:
task = nursery.start(arg, daemon=True, close_on_finish=True)
yield task
@asynccontextmanager
async def run_as_daemon(arg):
async with open_nursery() as nursery:
task = nursery.start(arg, daemon=True)
yield task
過去に存在した多くのAPIも以下のように実装できます。
from contextlib import asynccontextmanager
from asyncgui import open_nursery
# Removed in version 0.11.0
@asynccontextmanager
async def move_on_when_any(*args):
async with open_nursery() as nursery:
tasks = [nursery.start(arg, daemon=True, close_on_finish=True) for arg in args]
yield tasks
# Removed in version 0.11.0
@asynccontextmanager
async def run_as_daemons(*args):
async with open_nursery() as nursery:
tasks = [nursery.start(arg, daemon=True) for arg in args]
yield tasks
# Removed in version 0.10.0
@asynccontextmanager
async def run_as_main(arg):
async with open_nursery() as nursery:
task = nursery.start(arg, close_on_finish=True)
yield task
# Removed in version 0.11.0
@asynccontextmanager
async def wait_all_cm(*args):
async with open_nursery() as nursery:
tasks = [nursery.start(arg) for arg in args]
yield tasks
例外処理¶
ここのAPI全てに共通しているのが例外の運ばれ方です。
並行させているタスクの内どれか一つで例外が起きると他のタスクは中断され例外は呼び出し元に運ばれます。
この時 中断された他のタスクがその中断過程で更に例外を起こすかもしれないので例外は複数同時に起こりえます。
このため asyncgui は例外を運ぶために Python3.11 より登場した ExceptionGroup を用います
(3.11未満のPythonが使われていた場合は exceptiongroup を用います)。
これはたとえ例外が一つしか起こらなかった場合でもです。
try:
await wait_any(...)
except* Exception as excgroup:
for exc in excgroup.exceptions:
print('例外が起きました:', type(exc))
import exceptiongroup
def error_handler(excgroup):
for exc in excgroup.exceptions:
print('例外が起きました:', type(exc))
with exceptiongroup.catch({Exception: error_handler, }):
await wait_any(...)