API ReferenceΒΆ

class BoxΒΆ
async def async_fn(box):
    args, kwargs = await box.get()
    assert args == (1, )
    assert kwargs == {'crow': 'raven', }

box = Box()
box.put(1, crow='raven')

# This task will immediately end because the 'box' already has an item.
task = start(async_fn(box))
assert task.finished

box.clear()
# Now the box is empty, so this task will wait until an item is added.
task = start(async_fn(box))
assert not task.finished

# Put an item into the box, which will cause the task to end.
box.put(1, crow='raven')
assert task.finished
clear()ΒΆ

Remove the item from the box if there is one.

get() Awaitable[tuple]ΒΆ

Get the item from the box if there is one. Otherwise, wait until it’s put.

property is_empty: boolΒΆ
put(*args, **kwargs)ΒΆ

Put an item into the box if it’s empty.

put_or_update(*args, **kwargs)ΒΆ
update(*args, **kwargs)ΒΆ

Replace the item in the box if there is one already.

CancelledΒΆ

Exception class that represents cancellation. See Dealing with Cancellation.

Warning

Actually, this is not an exception class but a tuple of exception classes for now. But that’s an implementation detail, and it might become an actual class in the future; therefore, your code must be compatible in both cases.

class EventΒΆ
async def async_fn(e):
    args, kwargs = await e.wait()
    assert args == (2, )
    assert kwargs == {'crow': 'raven', }

    args, kwargs = await e.wait()
    assert args == (3, )
    assert kwargs == {'toad': 'frog', }

e = Event()
e.fire(1, crocodile='alligator')
task = start(async_fn(e))
e.fire(2, crow='raven')
e.fire(3, toad='frog')
assert task.finished

Warning

This differs significantly from asyncio.Event, as this one does not have a β€œset” state. When a Task calls its wait() method, it will always be blocked until fire() is called after that. Use Box if you want something closer to asyncio.Event.

Changed in version 0.7.0: This is now completely different from the previous version’s.

fire(*args, **kwargs)ΒΆ
wait() Awaitable[tuple]ΒΆ

Waits for the event to be fired.

class ExclusiveBoxΒΆ

Similar to Box, but this version does not allow multiple tasks to get() simultaneously. As a result, it operates faster.

clear()ΒΆ

Remove the item from the box if there is one.

get() Awaitable[tuple]ΒΆ

Get the item from the box if there is one. Otherwise, wait until it’s put.

property is_empty: boolΒΆ

Whether the box is empty.

put(*args, **kwargs)ΒΆ

Put an item into the box if it’s empty.

put_or_update(*args, **kwargs)ΒΆ
update(*args, **kwargs)ΒΆ

Replace the item in the box if there is one already.

class ExclusiveEventΒΆ

Similar to Event, but this version does not allow multiple tasks to wait() simultaneously. As a result, it operates faster.

fire(*args, **kwargs)ΒΆ
wait() Awaitable[tuple]ΒΆ
exception InvalidStateErrorΒΆ

Operation is not allowed in the current state.

class NurseryΒΆ

Similar to trio.Nursery. You should not directly instantiate this, use open_nursery().

close()ΒΆ

Cancel all the child tasks in the nursery as soon as possible.

property closed: boolΒΆ
start(aw: Awaitable | Task, /, *, daemon=False) TaskΒΆ

Immediately start a Task/Awaitable under the supervision of the nursery.

If the argument is a Task, itself will be returned. If it’s an typing.Awaitable, it will be wrapped in a Task, and that Task will be returned.

The daemon parameter acts like the one in the threading module. When only daemon tasks are left, they get cancelled, and the nursery closes.

class Task(aw: Awaitable, /)ΒΆ
cancel()ΒΆ

Cancel the task as soon as possible.

property cancelled: boolΒΆ

Whether the task has been cancelled.

close()ΒΆ

An alias for cancel().

property finished: boolΒΆ

Whether the task has been completed.

property result: AnyΒΆ

Result of the task. If the task is not finished, InvalidStateError will be raised.

property root_coro: CoroutineΒΆ

The starting point of the coroutine chain for the task.

property state: TaskStateΒΆ

The current state of the task.

property uid: intΒΆ

An unique integer assigned to the task.

class TaskStateΒΆ

Enum class that represents the Task state.

CANCELLEDΒΆ

The execution has been cancelled. The cause of the cancellation is either an explicit or implicit call to Task.cancel() or an unhandled exception.

CREATEDΒΆ

Waiting to start execution.

FINISHEDΒΆ

The execution has been completed.

STARTEDΒΆ

Currently running or suspended.

current_task() Awaitable[Task]ΒΆ

Returns the Task instance corresponding to the caller.

task = await current_task()
class disable_cancellationΒΆ

Return an async context manager that protects its code-block from cancellation.

async with disable_cancellation():
    await something  # <- never gets cancelled
dummy_task = <asyncgui.Task object>ΒΆ

An already closed task. This can be utilized to prevent the need for the common null validation mentioned below.

Before:

class MyClass:
    def __init__(self):
        self._task = None

    def restart(self):
        if self._task is not None:
            self._task.cancel()
        self._task = asyncgui.start(self.main())

    async def main(self):
        ...

After:

class MyClass:
    def __init__(self):
        self._task = asyncgui.dummy_task

    def restart(self):
        self._task.cancel()
        self._task = asyncgui.start(self.main())

    async def main(self):
        ...
move_on_when(aw: Awaitable | Task)ΒΆ

An alias for wait_any_cm().

class open_cancel_scopeΒΆ

Same as trio.CancelScope except this one returns an async context manager.

async with open_cancel_scope() as scope:
    ...
open_nursery() AsyncContextManager[Nursery]ΒΆ

Similar to trio.open_nursery().

async with open_nursery() as nursery:
    nursery.start(async_fn1())
    nursery.start(async_fn2(), daemon=True)
run_as_daemon(aw: Awaitable | Task) AsyncContextManager[Task]ΒΆ
async with run_as_daemon(async_fn()) as bg_task:
    ...
run_as_main(aw: Awaitable | Task) AsyncContextManager[Task]ΒΆ
async with run_as_main(async_fn()) as task:
    ...

Note

You need to use its older name, run_as_primary, if you are using asyncgui 0.6.2 or older.

sleep_forever() AwaitableΒΆ
await sleep_forever()
start(aw: Awaitable | Task, /) TaskΒΆ

Immediately start a Task/Awaitable.

If the argument is a Task, itself will be returned. If it’s an typing.Awaitable, it will be wrapped in a Task, and that Task will be returned.

async def async_func():
    ...

task = start(async_func())
async wait_all(*aws: Iterable[Awaitable | Task]) Awaitable[Sequence[Task]]ΒΆ

Run multiple tasks concurrently, and wait for all of them to end. When any of them raises an exception, the others will be cancelled, and the exception will be propagated to the caller, like trio.Nursery.

tasks = await wait_all(async_fn1(), async_fn2(), async_fn3())
if tasks[0].finished:
    print("The return value of async_fn1() :", tasks[0].result)
wait_all_cm(aw: Awaitable | Task) AsyncContextManager[Task]ΒΆ

The context manager form of wait_all().

async with wait_all_cm(async_fn()) as bg_task:
    ...
async wait_any(*aws: Iterable[Awaitable | Task]) Awaitable[Sequence[Task]]ΒΆ

Run multiple tasks concurrently, and wait for any of them to finish. As soon as that happens, the others will be cancelled. When any of them raises an exception, the others will be cancelled, and the exception will be propagated to the caller, like trio.Nursery.

tasks = await wait_any(async_fn1(), async_fn2(), async_fn3())
if tasks[0].finished:
    print("The return value of async_fn1() :", tasks[0].result)
wait_any_cm(aw: Awaitable | Task) AsyncContextManager[Task]ΒΆ

The context manager form of wait_any(), an equivalence of trio_util.move_on_when().

async with wait_any_cm(async_fn()) as bg_task:
    ...