Source code for core_mixins.decorators.timeout

# -*- coding: utf-8 -*-

"""
This decorator ensures a function does not run for more
than a certain time, otherwise an error is raised.
"""

import multiprocessing
from functools import update_wrapper
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from typing import Tuple


class _TimeoutWrapper:
    """Wrapper class that executes a function with a timeout."""

    _mp_context = multiprocessing.get_context("fork")

    def __init__(self, fcn: Callable, timeout: float) -> None:
        update_wrapper(self, fcn)
        self.timeout = timeout
        self.fcn = fcn

    @staticmethod
    def _worker_function(
        func: Callable,
        args: Tuple,
        kwargs: Dict,
        result_queue,
    ) -> None:
        """Worker function that runs in a separate process."""

        try:
            result = func(*args, **kwargs)
            result_queue.put(result)

        except Exception as e:  # pylint: disable=broad-exception-caught
            result_queue.put(e)

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        """Execute the function with timeout."""

        result_queue = self._mp_context.SimpleQueue()

        p = self._mp_context.Process(
            target=self._worker_function,
            args=(
                self.fcn,
                args,
                kwargs,
                result_queue
            ),
        )

        p.start()
        p.join(self.timeout)

        fcn_name = getattr(self.fcn, "__name__", repr(self.fcn))

        if p.is_alive():
            p.terminate()
            p.join()

            raise TimeoutError(
                f"Function '{fcn_name}' timed out "
                f"after {self.timeout} seconds."
            )

        if p.exitcode != 0:
            raise RuntimeError(
                f"Function '{fcn_name}' worker process "
                f"crashed with exit code {p.exitcode}."
            )

        result = result_queue.get()
        if isinstance(result, Exception):
            raise result
        return result


[docs] def with_timeout( fcn: Optional[Callable] = None, timeout: float = 10, ) -> Callable: """ It executes a function that will time out after the specified value. **Important:** Use with caution when combining with `asyncio` because the decorator spawns a subprocess and uses `multiprocessing.Queue` to pass data. When combined with functions like `run_in_executor`, you could face issues if you are working with large files due: - Pickling large data. - Fork in async context. - Double overhead. Alternative .. code-block:: python loop = asyncio.get_event_loop() try: return await asyncio.wait_for( loop.run_in_executor(None, the_function), timeout=timeout ) except asyncio.TimeoutError: raise TimeoutError(...) :param fcn: The function being decorated. :param timeout: The seconds before time out. :return: The wrapped function. """ def decorator_timeout(func: Callable) -> _TimeoutWrapper: return _TimeoutWrapper(func, timeout) if not fcn: return decorator_timeout return decorator_timeout(fcn)