Notes

I/O in AsyncKivy

asynckivy does not have any I/O primitives unlike trio and asyncio do, thus threads may be the best way to perform them without blocking the main-thread:

from concurrent.futures import ThreadPoolExecutor
import asynckivy as ak

executor = ThreadPoolExecutor()


def thread_blocking_operation():
    '''
    This function is called from outside the main-thread so you should not
    perform any graphics-related operations here.
    '''


async def async_fn():
    r = await ak.run_in_thread(thread_blocking_operation)
    print("return value:", r)

    r = await ak.run_in_executor(executor, thread_blocking_operation)
    print("return value:", r)

Unhandled exceptions (not BaseException) are propagated to the caller so you can catch them like you do in synchronous code:

import requests
import asynckivy as ak

async def async_fn(label):
    try:
        response = await ak.run_in_thread(lambda: requests.get('htt...', timeout=10))
    except requests.Timeout:
        label.text = "TIMEOUT!"
    else:
        label.text = "RECEIVED"

Kivy’s Event System

(under construction)

The Problem with Async Generators

asyncio and trio do some hacky stuff, sys.set_asyncgen_hooks() and sys.get_asyncgen_hooks(), which likely hinders asynckivy-flavored async generators. You can see its details here.

Because of that, the APIs that create async generators might not work perfectly if asyncio or trio is running. Here is a list of them:

Places where async operations are disallowed

Most of the asynckivy APIs that return an async iterator don’t allow to perform async operations during the iteration. Here is a list of them:

async for __ in rest_of_touch_events(...):
    await awaitable  # NOT ALLOWED
    async with async_context_manager:  # NOT ALLOWED
        ...
    async for __ in async_iterator:  # NOT ALLOWED
        ...

How to structure your program

Ultimately, your program should have only one “root” task, with all other tasks as its children or descendants. This is something that Trio forces you to do but asynckivy does not.

You can achieve this by calling asynckivy.start() only once in your program, and spawning all other tasks through the Structured Concurrency APIs.

import asynckivy as ak

class YourApp(App):
    def on_start(self):
        self._root_task = ak.start(self.main())

    def on_stop(self):
        self._root_task.cancel()

    async def main(self):
        ...