Source code for core_mixins.decorators.timeout
# -*- coding: utf-8 -*-
"""
This decorator ensures a function does not run for more
than a certain time, otherwise an error is raised.
"""
import multiprocessing
from functools import update_wrapper
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from typing import Tuple
class _TimeoutWrapper:
"""Wrapper class that executes a function with a timeout."""
_mp_context = multiprocessing.get_context("fork")
def __init__(self, fcn: Callable, timeout: float) -> None:
update_wrapper(self, fcn)
self.timeout = timeout
self.fcn = fcn
@staticmethod
def _worker_function(
func: Callable,
args: Tuple,
kwargs: Dict,
result_queue,
) -> None:
"""Worker function that runs in a separate process."""
try:
result = func(*args, **kwargs)
result_queue.put(result)
except Exception as e: # pylint: disable=broad-exception-caught
result_queue.put(e)
def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""Execute the function with timeout."""
result_queue = self._mp_context.SimpleQueue()
p = self._mp_context.Process(
target=self._worker_function,
args=(
self.fcn,
args,
kwargs,
result_queue
),
)
p.start()
p.join(self.timeout)
fcn_name = getattr(self.fcn, "__name__", repr(self.fcn))
if p.is_alive():
p.terminate()
p.join()
raise TimeoutError(
f"Function '{fcn_name}' timed out "
f"after {self.timeout} seconds."
)
if p.exitcode != 0:
raise RuntimeError(
f"Function '{fcn_name}' worker process "
f"crashed with exit code {p.exitcode}."
)
result = result_queue.get()
if isinstance(result, Exception):
raise result
return result
[docs]
def with_timeout(
fcn: Optional[Callable] = None,
timeout: float = 10,
) -> Callable:
"""
It executes a function that will time out after the
specified value.
**Important:** Use with caution when combining with `asyncio`
because the decorator spawns a subprocess and uses `multiprocessing.Queue`
to pass data. When combined with functions like `run_in_executor`, you
could face issues if you are working with large files due:
- Pickling large data.
- Fork in async context.
- Double overhead.
Alternative
.. code-block:: python
loop = asyncio.get_event_loop()
try:
return await asyncio.wait_for(
loop.run_in_executor(None, the_function),
timeout=timeout
)
except asyncio.TimeoutError:
raise TimeoutError(...)
:param fcn: The function being decorated.
:param timeout: The seconds before time out.
:return: The wrapped function.
"""
def decorator_timeout(func: Callable) -> _TimeoutWrapper:
return _TimeoutWrapper(func, timeout)
if not fcn:
return decorator_timeout
return decorator_timeout(fcn)