Notes¶
I/O in AsyncKivy¶
asynckivy
does not have any I/O primitives unlike trio
and asyncio
do,
thus threads may be the best way to perform them without blocking the main-thread:
from concurrent.futures import ThreadPoolExecutor
import asynckivy as ak
executor = ThreadPoolExecutor()
def thread_blocking_operation():
'''
This function is called from outside the main-thread so you should not
perform any graphics-related operations here.
'''
async def async_fn():
r = await ak.run_in_thread(thread_blocking_operation)
print("return value:", r)
r = await ak.run_in_executor(executor, thread_blocking_operation)
print("return value:", r)
Unhandled exceptions (not BaseException
) are propagated to the caller so you can catch them like you do in
synchronous code:
import requests
import asynckivy as ak
async def async_fn(label):
try:
response = await ak.run_in_thread(lambda: requests.get('htt...', timeout=10))
except requests.Timeout:
label.text = "TIMEOUT!"
else:
label.text = "RECEIVED"
Kivy’s Event System¶
(under construction)
The Problem with Async Generators¶
asyncio
and trio
do some hacky stuff, sys.set_asyncgen_hooks()
and sys.get_asyncgen_hooks()
,
which likely hinders asynckivy-flavored async generators.
You can see its details here.
Because of that, the APIs that create async generators might not work perfectly if asyncio
or trio
is running.
Here is a list of them:
Places where async operations are disallowed¶
Most of the asynckivy APIs that return an async iterator don’t allow to perform async operations during the iteration. Here is a list of them:
asynckivy.anim_with_xxx
async for __ in rest_of_touch_events(...):
await awaitable # NOT ALLOWED
async with async_context_manager: # NOT ALLOWED
...
async for __ in async_iterator: # NOT ALLOWED
...
How to structure your program¶
Ultimately, your program should have only one “root” task, with all other tasks as its children or descendants. This is something that Trio forces you to do but asynckivy does not.
You can achieve this by calling asynckivy.start()
only once in your program,
and spawning all other tasks through the Structured Concurrency APIs.
import asynckivy as ak
class YourApp(App):
def on_start(self):
self._root_task = ak.start(self.main())
def on_stop(self):
self._root_task.cancel()
async def main(self):
...