Usage¶
In this section, I’ll explain how to wrap a callback-style API in an async/await-style API, using sched
as an example.
Let’s start by writing code that does the following, and remember how to use the module:
Waits for 1 second
Prints “A”
Waits for 2 seconds
Prints “B”
Waits for 3 seconds
Prints “C”
Our code would look like this:
import sched
PRIORITY = 0
s = sched.scheduler()
def task():
def step1():
print('A')
s.enter(2, PRIORITY, step2)
def step2():
print('B')
s.enter(3, PRIORITY, step3)
def step3():
print('C')
s.enter(1, PRIORITY, step1)
task()
s.run()
sched
provides callback-style APIs, and the code that uses them is not easy to understand.
You might wonder, “Why not just use time.sleep()
?” So let me address that first.
Counter to ‘Why not just use time.sleep
?’¶
Indeed, if you use time.sleep
, it’ll be easier to understand.
from time import sleep
def task():
sleep(1)
print('A')
sleep(2)
print('B')
sleep(3)
print('C')
task()
However, this approach works only when there’s a single task.
If there are multiple ones, the story changes.
With the time.sleep
method, you need multiple threads to run multiple tasks simultaneously, whereas with the sched
method, one thread is sufficient.
For example, you can just call the task()
multiple times at the end of the code:
task()
task()
s.run()
Therefore, sched
cannot be replaced with time.sleep
.
API Design¶
We want to wrap the API, but first, we need to imagine how we want to use it from asyncgui
.
Remember the time.sleep
example we saw earlier:
from time import sleep
def task():
sleep(1)
print('A')
sleep(2)
print('B')
sleep(3)
print('C')
This approach has the disadvantage of occupying a thread, but in return, it’s quite easy to read. If we successfully design our API to be as readable as this, we’d have the best of both worlds [1]. So, let’s aim for it.
# our ideal
async def task():
await sleep(1)
print('A')
await sleep(2)
print('B')
await sleep(3)
print('C')
Using it like await sleep(1)
means the sleep
must be a collections.abc.Callable
that returns an collections.abc.Awaitable
.
There are several options that meet this condition, and we choose an async function [2].
async def sleep(duration):
...
But hold on, since sched.scheduler.enter()
is an instance method, our API needs to take a sched.scheduler
instance.
And since it has a priority
parameter, our API might better have one as well in order not to lose any functionality of the original API.
async def sleep(scheduler, priority, duration):
...
Let’s start implementing it with this goal in mind.
Implementation¶
To wrap a callback-style API in an async/await-style API,
we need to set up execution to resume when a callback function is called, and then pause it.
This might sound unclear, but if you’ve ever used asyncio.Event
or trio.Event
, you already know it.
import asyncio
async def wrapper():
e = asyncio.Event()
# Set up the execution to resume when this callback function is called
register_callback(lambda *args, **kwargs: e.set())
# Pause the execution
await e.wait()
async def user():
print('A')
await wrapper()
print('B')
By introducing a wrapper like this, the user
side code can use a callback-style API without losing readability.
And asyncgui
has an API specifically designed for this purpose.
import asyncgui as ag
async def wrapper():
e = ag.ExclusiveEvent()
register_callback(e.fire) # A
args, kwargs = await e.wait() # B
asyncgui.ExclusiveEvent
has two advantages over asyncio.Event
.
One, you don’t need to use a lambda because asyncgui.ExclusiveEvent.fire()
can take any arguments (line A).
Two, you can receive the arguments passed to it (line B).
Let’s implement our API with this.
import asyncgui as ag
async def sleep(scheduler, priority, duration):
e = ag.ExclusiveEvent()
scheduler.enter(duration, priority, e.fire)
await e.wait()
Now we can use it like this:
import functools
import sched
import asyncgui as ag
async def sleep(...):
...
def main():
s = sched.scheduler()
slp = functools.partial(sleep, s, 0)
ag.start(task(slp))
s.run()
async def task(slp):
await slp(1)
print('A')
await slp(2)
print('B')
await slp(3)
print('C')
main()
We successfully achieved the best of both worlds; our API doesn’t occupy a thread, and the user side code is as readable as the time.sleep()
example.
However, there’s one more thing to address: Dealing with Cancellation.
It is not strictly necessary in this case because ExclusiveEvent
handles it to a certain extent,
but it’s better to handle it within sleep
itself to cover some edge cases.
import asyncgui as ag
async def sleep(scheduler, priority, duration):
e = ag.ExclusiveEvent()
event = scheduler.enter(duration, priority, e.fire)
try:
await e.wait()
except ag.Cancelled:
scheduler.cancel(event)
raise
This is the complete version of our API.
We successfully connected the sched
module to the asyncgui
module.
Once connected, we can benefit from the powerful Structured Concurrency APIs.
import functools
import sched
import asyncgui as ag
import string
async def sleep(scheduler, priority, duration):
...
def main():
s = sched.scheduler()
slp = functools.partial(sleep, s, 0)
ag.start(async_main(slp))
s.run()
async def async_main(slp):
# Print digits from 0 to 9 at 0.3-second intervals, with a 2-second time limit
async with ag.move_on_when(slp(2)) as timeout_tracker:
for c in string.digits:
print(c, end=' ')
await slp(0.3)
print('')
if timeout_tracker.finished:
print("Timeout")
else:
print("Printed all digits in time")
main()
0 1 2 3 4 5 6
Timeout