Usage 🇯🇵

ここではどのように asyncgui とコールバック型のAPIを繋ぐのかを sched を例に見ていきます。 まずはこれをそのまま用いて

  1. 1秒待機

  2. A を出力

  3. 2秒待機

  4. B を出力

  5. 3秒待機

  6. C を出力

といった処理を書いてみます。

import sched

PRIORITY = 0
s = sched.scheduler()

def やりたい事():
    def step1():
        print('A')
        s.enter(2, PRIORITY, step2)

    def step2():
        print('B')
        s.enter(3, PRIORITY, step3)

    def step3():
        print('C')

    s.enter(1, PRIORITY, step1)

やりたい事()
s.run()

コールバック型のコードはやはり読みづらいですね。 ここでもしかすると「いや time.sleep() 使えばいいのでは?」といった声が上がるかもしれないので反論しておきます。

「time.sleep 使えばいいのでは?」に対する反論

確かに time.sleep を用いれば次のように書けます。

from time import sleep

def やりたい事():
    sleep(1)
    print('A')
    sleep(2)
    print('B')
    sleep(3)
    print('C')

やりたい事()

しかしこれは やりたい事 が一つしか無いとき限定であり複数あると話が変わってきます。 time.sleep のやり方では複数のスレッドを建てないと複数の事ができないのに対し sched を用いたやり方は一つで足ります。 具体的にはコードの最後の部分で

# あくまで一例
やりたい事()
やりたい事()
s.run()

という風に やりたい事 を複数回呼ぶだけです。なので time.sleepsched の代わりは務まらないと言えます。

APIを決める

それではこれから繋げていくのですが先ずはいま繋げようとしているコールバック型APIである sched.scheduler.enter()asyncgui からどのような形で利用したいかを考えないといけません。 これは即ち enter() のasync/await版がどうあるべきかを考える事です。

ここで上で出てきた time.sleep の例を思い出して欲しいのですが 以下のようなコードでした。

from time import sleep

def やりたい事():
    sleep(1)
    print('A')
    sleep(2)
    print('B')
    sleep(3)
    print('C')

このやり方はスレッドを占有するという短所がある反面、コードがすこぶる読みやすいという利を持っています。 enter() のasync/await版もこのように読みやすいと使う側はきっと嬉しいんじゃないでしょうか? 具体的には以下の感じです。

async def やりたい事():
    await sleep(1)
    print('A')
    await sleep(2)
    print('B')
    await sleep(3)
    print('C')

これが実現すれば time.sleep の物とほぼ同等の読みやすさな上にスレッドを占有しないという良いとこ取りができた事になります [1] 。 なのでこの様な姿を目指す事にしましょう。

await sleep(1) という使い方をするという事は sleepcollections.abc.Awaitable を返す collections.abc.Callable でないといけません。 その条件を満たす実装方法は幾つか有るのですが、とりあえずその一つであるasync関数 [2] から考えてみます。

async def sleep(duration):
    ...

こう書きたいところなのですが enter() はインスタンスメソッドなのでインスタンスを渡さないと呼びようがありませんし、 このメソッドは priority という引数も取るのでそれも渡してあげましょう。

async def sleep(scheduler, priority, duration):
    ...

というわけでこの姿を目指して実装にとりかかります。

実装

コールバック型のAPIをasync/awaitの世界と繋ぐにはコールバック関数が呼ばれた時に処理が再開するように仕組んだ上で処理を停める必要があります。 難しそうに聞こえますが asyncio.Eventtrio.Event を使った事があればピンと来るんじゃないでしょうか?

import asyncio

async def 仲介者():
    e = asyncio.Event()

    # コールバック関数が呼ばれた時に処理が再開するように仕組む
    コールバック関数を登録(lambda *args, **kwargs: e.set())

    # 処理を停める
    await e.wait()

async def 利用者():
    print('A')
    await 仲介者()
    print('B')

このように 仲介者 を挟む事で 利用者 側のコードは読みやすさを損なわずにコールバック型のAPIを使えるようになります。 そして asyncgui にはこの目的に特化したAPIがあります。

import asyncgui as ag

async def 仲介者():
    e = ag.ExclusiveEvent()
    コールバック関数を登録(e.fire)  # A
    args, kwargs = await e.wait()  # B

asyncgui の場合は asyncgui.ExclusiveEvent.fire() がどんな引数でも受け取れるようになっているのでlambdaを挟まなくて済むうえ(A行)、 fire に渡った引数を受け取れる(B行)というのが asyncio.Event には無い強みです。 これ用いて sleep を実装すると以下のようになります。

import asyncgui as ag

async def sleep(scheduler, priority, duration):
    e = ag.ExclusiveEvent()
    scheduler.enter(duration, priority, e.fire)
    await e.wait()

これで以下のように分かりやすく やりたい事 が書けるようになりました…

import functools
import sched
import asyncgui as ag

async def sleep(...):
    省略

def main():
    s = sched.scheduler()
    slp = functools.partial(sleep, s, 0)
    ag.start(やりたい事(slp))
    s.run()

async def やりたい事(slp):
    await slp(1)
    print('A')
    await slp(2)
    print('B')
    await slp(3)
    print('C')

main()

と言いたい所なのですがもう一つやっておきたい事があり、それは中断への対応です。 最低限の対応は ExclusiveEvent が行っているので sleep 内で行うことは必須ではないのですがやっておく方がより良いです。 (参考: Dealing with Cancellation)

import asyncgui as ag

async def sleep(scheduler, priority, duration):
    e = ag.ExclusiveEvent()
    event = scheduler.enter(duration, priority, e.fire)
    try:
        await e.wait()
    except ag.Cancelled:
        scheduler.cancel(event)
        raise

これで完璧に schedasyncgui と繋ぐ事に成功しました。 一度繋ぐ事ができれば asyncgui の持つ強力な Structured Concurrency 🇯🇵 の恩恵を受けられます。

import functools
import sched
import asyncgui as ag
import string

async def sleep(scheduler, priority, duration):
    省略

def main():
    s = sched.scheduler()
    slp = functools.partial(sleep, s, 0)
    ag.start(async_main(slp))
    s.run()

async def async_main(slp):
    # 0から9までの数字を0.3秒間隔で出力するが、その作業に2秒の制限時間を設ける
    async with ag.move_on_when(slp(2)) as timeout_tracker:
        for c in string.digits:
            print(c, end=' ')
            await slp(0.3)
    print('')

    if timeout_tracker.finished:
        print("時間切れ")
    else:
        print("時間内に全ての数字を出力し終わりました")

main()
0 1 2 3 4 5 6
時間切れ