Source code for core_mixins.decorators.async_

# -*- coding: utf-8 -*-

"""
Async wrapper for executing async code from
synchronous context.
"""

import asyncio
import inspect
import threading
from functools import wraps
from typing import Generic
from typing import TypeVar


T = TypeVar("T")


[docs] class SyncWrapper(Generic[T]): """ It provides an interface (mechanism) to execute async code from sync apps via a background thread that keeps a single event loop alive, avoiding `asyncio.run()` which creates and destroys an event loop each time and becomes expensive for many method calls. .. code-block:: python class Test: def sync_method(self) -> str: return self.__class__.__name__ async def testing(self) -> bool: await sleep(0) return True sync_instance = SyncWrapper(instance) assert Test.__name__, sync_instance.sync_method() assert sync_instance.testing(), True sync_instance.close() .. """
[docs] def __init__(self, async_instance: T) -> None: self._async_instance = async_instance self._loop = asyncio.new_event_loop() self._thread = threading.Thread(target=self._run_loop, daemon=True) self._thread.start()
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __getattr__(self, name): attr = getattr(self._async_instance, name) if inspect.iscoroutinefunction(attr): @wraps(attr) def sync_method(*args, **kwargs): coro = attr(*args, **kwargs) future = asyncio.run_coroutine_threadsafe(coro, self._loop) return future.result() return sync_method return attr def _run_loop(self): asyncio.set_event_loop(self._loop) self._loop.run_forever()
[docs] def close(self): """Close the async instance and stop the event loop""" try: close_attr = getattr(self._async_instance, "close", None) if close_attr is not None and inspect.iscoroutinefunction(close_attr): coro = close_attr() future = asyncio.run_coroutine_threadsafe(coro, self._loop) future.result() finally: self._loop.call_soon_threadsafe(self._loop.stop) self._thread.join() self._loop.close()