Decorators#

This module provides a collection of useful decorators for enhancing function and class behavior.

Quick Examples#

Cache hierarchy#

All cache decorators share a common extensible base defined in cache_memory_based.py:

cache_memory_based.py  L2Backend (interface), _CacheWrapper (L1 + optional fallback), cache_memory_based
cache_file_based.py  _DiskBackend(L2Backend), cache_file_based
cache_redis.py       _RedisBackend(L2Backend), ...   ← future
cache_s3.py          _S3Backend(L2Backend), ...      ← future

_CacheWrapper is the L1 in-memory LRU layer. It accepts an optional fallback, any L2Backend, that is consulted on an L1 miss and written to on every new computation (write-through). cache_memory_based() uses no fallback and is the pure in-memory base of the hierarchy; it also acts as the degraded fallback when a backend is unavailable (e.g. Redis unreachable). cache is a backward-compatible alias for cache_memory_based().

TTL (time-to-live) is supported at every layer:

  • L1: _CacheWrapper expires entries lazily on the next access. No background thread is used.

  • L2: _DiskBackend stores a timestamp alongside each value and returns _MISS when the entry is older than ttl. Future Redis/S3 backends can delegate to native TTL primitives (e.g. SETEX) by overriding save/load.

Building a new backend requires three steps:

  1. Implement L2Backend, call super().__init__(ttl=ttl) to inherit TTL storage, then implement load(cache_key) (return the value or _MISS) and save(cache_key, value).

  2. Pass your backend as fallback to cache_memory_based().

  3. Wrap with a decorator function following the bare / parameterised pattern used by cache_file_based().

# example: skeleton for a future cache_redis
import pickle
import time
from core_mixins.decorators.cache_memory_based import L2Backend, _MISS, cache_memory_based

class _RedisBackend(L2Backend):
    def __init__(self, host="localhost", port=6379, ttl=None):
        super().__init__(ttl=ttl)
        self._redis = ...  # connect

    def load(self, cache_key):
        raw = self._redis.get(self._key(cache_key))
        return pickle.loads(raw) if raw is not None else _MISS  # noqa: S301

    def save(self, cache_key, value):
        data = pickle.dumps(value)
        if self.ttl is not None:
            self._redis.setex(self._key(cache_key), int(self.ttl), data)
        else:
            self._redis.set(self._key(cache_key), data)

def cache_redis_based(fcn=None, *, maxsize=128, host="localhost", ttl=None):
    def decorator(_fcn):
        backend = _RedisBackend(host=host, ttl=ttl)
        return cache_memory_based(_fcn, maxsize=maxsize, fallback=backend, ttl=ttl)
    if fcn is not None:
        return decorator(fcn)
    return decorator

Cache Decorator#

Note

functools.lru_cache is order-sensitive for keyword arguments: f(a=1, b=2) and f(b=2, a=1) are treated as distinct cache entries, so the underlying function is called twice for the same logical input. @cache / @cache_memory_based sorts kwargs before building the key, making it order-insensitive.

If kwargs order-insensitivity is not needed, prefer @functools.lru_cache(maxsize=N), it is implemented in C and exposes .cache_info() / .cache_clear().

from core_mixins.decorators import cache, cache_memory_based

# Bare usage: unbounded, never expires
@cache
def expensive_computation(n):
    return sum(range(n))

result = expensive_computation(1000)  # Computed
result = expensive_computation(1000)  # Cached

# Bounded cache: keeps the 128 most-recently used results
@cache(maxsize=128)
def fetch_record(record_id):
    return db.get(record_id)

# With TTL: entry expires 5 minutes after it was stored
@cache_memory_based(maxsize=256, ttl=300)
def get_config(key):
    return remote_config.fetch(key)

# With keyword arguments: order-insensitive, unlike functools.lru_cache
@cache
def query(table, *, filters=None, limit=100):
    ...

Performance comparison#

lru_cache is a C extension; @cache is pure Python with a threading lock on every call. The benchmark below measures 1 million calls on a warm cache and on an all-miss workload.

import timeit
from functools import lru_cache
from core_mixins.decorators import cache

N = 1_000_000

# positional args, warm cache
@lru_cache(maxsize=128)
def lru_pos(x, y): return x + y

@cache(maxsize=128)
def our_pos(x, y): return x + y

for i in range(100): lru_pos(i%10, i%7); our_pos(i%10, i%7)
t_lru_pos = timeit.timeit(lambda: lru_pos(3, 4), number=N)
t_our_pos = timeit.timeit(lambda: our_pos(3, 4), number=N)

# keyword args, warm cache
@lru_cache(maxsize=128)
def lru_kw(x=0, y=0): return x + y

@cache(maxsize=128)
def our_kw(x=0, y=0): return x + y

for i in range(100): lru_kw(x=i%10, y=i%7); our_kw(x=i%10, y=i%7)
t_lru_kw = timeit.timeit(lambda: lru_kw(x=3, y=4), number=N)
t_our_kw = timeit.timeit(lambda: our_kw(x=3, y=4), number=N)

# cache miss: all unique args
@lru_cache(maxsize=None)
def lru_miss(x): return x * 2

@cache(maxsize=None)
def our_miss(x): return x * 2

gen = iter(range(N))
t_lru_miss = timeit.timeit(lambda: lru_miss(next(gen)), number=N)
gen = iter(range(N))
t_our_miss = timeit.timeit(lambda: our_miss(next(gen)), number=N)

hdr = '{:<32} {:>10} {:>10} {:>8}'
row = '{:<32} {:>9.3f}s {:>9.3f}s {:>7.1f}x'
print(hdr.format('scenario (1M calls)', 'lru_cache', '@cache', 'ratio'))
print('-' * 64)
print(row.format('positional args, cache hit', t_lru_pos, t_our_pos, t_our_pos / t_lru_pos))
print(row.format('keyword args, cache hit',    t_lru_kw,  t_our_kw,  t_our_kw  / t_lru_kw))
print(row.format('cache miss (all unique)',     t_lru_miss,t_our_miss,t_our_miss / t_lru_miss))

Results on CPython 3.12 (1 M calls each):

Scenario

lru_cache

@cache

Ratio

Positional args, cache hit

0.053 s

0.541 s

10.2× slower

Keyword args, cache hit

0.104 s

0.750 s

7.2× slower

Cache miss (all unique)

0.127 s

0.753 s

5.9× slower

lru_cache is ~6–10× faster because its hot path avoids the Python interpreter almost entirely. @cache pays a fixed cost on every call: acquiring a threading.Lock, building a nested tuple key, and an OrderedDict lookup. The lock is the dominant overhead.

The trade-off: use @cache when kwargs order-insensitivity is required; use @functools.lru_cache everywhere else.

Cache File Based Decorator#

Write-through two-level cache: L1 is in-memory LRU; the fallback is a per-function directory of pickle files on disk. Results survive process restarts and are shared across processes that point at the same path.

from core_mixins.decorators import cache_file_based

# Bare usage: default path (tempfile.gettempdir()), unbounded TTL
@cache_file_based
def expensive(n):
    return n ** 2

# Custom path and memory bound
@cache_file_based(path="/var/cache/myapp", maxsize=64)
def fetch_record(record_id):
    return db.get(record_id)

# With TTL: entries expire after 10 minutes at both L1 and L2
@cache_file_based(path="/var/cache/myapp", maxsize=128, ttl=600)
def get_report(report_id):
    return reporting_api.fetch(report_id)

# Every new result is written to both L1 and disk immediately.
# When L1 is full the oldest entry is dropped from memory, disk file kept.
# A second process at the same path gets a cache hit without recomputing.
#
# With ttl set, expiry is symmetric across both layers:
#   L1: expired entry is evicted from memory on the next access.
#   L2: expired .pkl file is deleted from disk on the next access.
# No background thread, no cron job, the cache self-cleans.

Count Calls Decorator#

from core_mixins.decorators import count_calls

@count_calls
def my_function():
    return "Hello"

my_function()
my_function()
print(my_function.calls_number)  # Output: 2

Retry Decorator#

from core_mixins.decorators import retry
import requests

@retry(tries=3, delay=1, backoff=2)
def fetch_data(url):
    response = requests.get(url)
    response.raise_for_status()
    return response.json()

Singleton Decorator#

from core_mixins.decorators import singleton

@singleton
class DatabaseConnection:
    def __init__(self):
        self.connection = "Connected"

db1 = DatabaseConnection()
db2 = DatabaseConnection()
assert db1 is db2  # Same instance

With Time Out Decorator#

from core_mixins.decorators import with_timeout

@with_timeout(timeout=2)
def long_task_that_times_out():
    """Example function that will timeout."""
    time.sleep(5)
    return "Task completed"

@with_timeout(timeout=5)
def long_task_that_succeeds():
    """Example function that completes before timeout."""
    time.sleep(2)
    return "Task completed successfully"

if __name__ == "__main__":
    # Test 1: Function that completes successfully
    try:
        result = long_task_that_succeeds()
        print(f"Success: {result}")
    except TimeoutError as e:
        print(f"Timeout: {e}")

    # Test 2: Function that times out
    try:
        result = long_task_that_times_out()
        print(f"Success: {result}")
    except TimeoutError as e:
        print(f"Timeout: {e}")

Important:

Use with caution when combining with asyncio because the decorator spawns a subprocess and uses inter-process communication to pass data. When combined with functions like run_in_executor, you could face issues if you are working with large files due:

  • Pickling large data.

  • Fork in async context.

  • Double overhead.

Alternative

loop = asyncio.get_event_loop()
try:
    return await asyncio.wait_for(
        loop.run_in_executor(None, the_function),
        timeout=timeout
    )

except asyncio.TimeoutError:
    raise TimeoutError(...)

Time Out Decorator (Signal/Linux)#

from core_mixins.decorators import with_timeout_signal

@with_timeout_signal(timeout=2)
def long_task_that_times_out():
    return "Task completed"

Timer Decorator#

from core_mixins.decorators import timer

@timer
def slow_function():
    import time
    time.sleep(1)
    return "Done"

result, elapsed = slow_function()
print(f"Took {elapsed:.2f} seconds")

API Reference#

SyncWrapper#

Async wrapper for executing async code from synchronous context.

class core_mixins.decorators.async_.SyncWrapper(async_instance: T)[source]#

Bases: Generic[T]

It provides an interface (mechanism) to execute async code from sync apps via a background thread that keeps a single event loop alive, avoiding asyncio.run() which creates and destroys an event loop each time and becomes expensive for many method calls.

class Test:
    def sync_method(self) -> str:
        return self.__class__.__name__

    async def testing(self) -> bool:
        await sleep(0)
        return True

sync_instance = SyncWrapper(instance)
assert Test.__name__, sync_instance.sync_method()
assert sync_instance.testing(), True
sync_instance.close()
__init__(async_instance: T) None[source]#
close()[source]#

Close the async instance and stop the event loop

cache_memory_based#

Caching decorators: pure in-memory (cache) and extensible fallback base.

class core_mixins.decorators.cache_memory_based.L2Backend(ttl: float | None = None)[source]#

Bases: ABC

Interface for pluggable cache fallback backends. Implement this to add a persistence layer (disk, S3, Redis, DynamoDB or any other engine) to _CacheWrapper via its fallback parameter.

__init__(ttl: float | None = None) None[source]#
abstractmethod load(cache_key: Any) Any[source]#

Return the cached value, or _MISS if not present.

abstractmethod save(cache_key: Any, value: Any) None[source]#

Persist value under cache_key.

core_mixins.decorators.cache_memory_based.cache_memory_based(fcn: Callable | None = None, *, maxsize: int | None = None, fallback: L2Backend | None = None, ttl: float | None = None) Callable[source]#

Pure in-memory memoising decorator; the no-fallback base of the cache hierarchy.

Subclasses add persistence by passing a L2Backend to _CacheWrapper: cache_file_based() uses disk, future variants will use S3 or Redis. All of them fall back to this in-memory behavior when their backend is unavailable.

functools.lru_cache() is order-sensitive for keyword arguments: f(a=1, b=2) and f(b=2, a=1) are treated as distinct cache entries and the underlying function is called twice. This decorator normalises kwargs by sorting them before building the key, so both calls map to the same entry.

If kwargs order-insensitivity is not needed, prefer @functools.lru_cache(maxsize=N), it is implemented in C and exposes .cache_info() / .cache_clear(). Use this decorator when you need to memoize functions whose callers may pass the same keyword arguments in different orders.

Parameters:
  • fcn – The function being decorated.

  • maxsize – Maximum number of entries to keep. When the limit is reached the least-recently-used entry is evicted. None (default) means the cache is unbounded.

  • fallback – Optional L2Backend consulted on an L1 miss and written to on every new computation. None (default) gives pure in-memory behavior identical to cache_memory_based().

  • ttl – Time-to-live in seconds for L1 entries. Expired entries are evicted lazily on the next access. None (default) means entries never expire.

Returns:

The wrapped function.

Return type:

Callable

core_mixins.decorators.cache_memory_based.cache(fcn: Callable | None = None, *, maxsize: int | None = None, fallback: L2Backend | None = None, ttl: float | None = None) Callable#

Backward-compatible alias for cache_memory_based().

cache_file_based#

Note

Serialization format (pickle)

Cached values are serialized with Python’s pickle module. Bandit flags pickle by default (rules B403 and B301) because deserializing data from an untrusted source can execute arbitrary code.

That risk does not apply here: the only data ever passed to pickle.loads() is data that this library previously wrote with pickle.dumps(). There is no user-controlled input anywhere in the deserialization path, so the Bandit warnings are suppressed with # nosec B403 / # nosec B301.

Do not point this decorator at a directory that is writable by untrusted parties.

Persistent file-based caching decorator.

core_mixins.decorators.cache_file_based.cache_file_based(fcn: Callable | None = None, *, path: str | None = None, maxsize: int = 128, ttl: float | None = None) Callable[source]#

Write-through caching decorator: L1 is a bounded in-memory LRU (_CacheWrapper); the fallback is a directory of per-entry pickle files on disk (_DiskBackend).

Every new result is written to both L1 and disk immediately. When L1 is full the least-recently-used entry is evicted from memory only — its disk file is kept. A subsequent call with the same arguments reloads the value from disk without invoking the wrapped function.

Because results are always on disk, multiple processes sharing the same path benefit from each other’s computed results.

Supports both bare and parameterized call styles:

@cache_file_based
def f(x): ...

@cache_file_based(path="/tmp/mycache", maxsize=64)
def g(x): ...
Parameters:
  • fcn – The function being decorated (set when used bare).

  • path – Directory under which per-function subdirectories are created. Defaults to tempfile.gettempdir().

  • maxsize – Maximum number of entries to keep in memory. Least-recently-used entries are evicted from memory (disk file kept) when the limit is exceeded. Defaults to 128.

  • ttl – Time-to-live in seconds. Expiry is symmetric across both layers: expired L1 entries are evicted from memory, and expired L2 files are deleted from disk, both lazily on the next access. No background thread or external cleanup is required. None (default) means entries never expire.

Returns:

The wrapped function.

Return type:

Callable

count_calls#

Decorators for counting function call invocations.

core_mixins.decorators.count_calls.count_calls(fcn: Callable) _CountWrapper[source]#

It provides the mechanism to count invocations…

Parameters:

fcn (Callable) – The function being decorated.

Returns:

The wrapped function with calls_number attribute.

Return type:

_CountWrapper

repeat#

Repeat decorator for executing functions multiple times.

core_mixins.decorators.repeat.repeat(fcn: Callable | None = None, *, times: int = 2) Callable[source]#

Repeat n times the function and return the list of returned values…

Parameters:
  • fcn (Callable) – The function being decorated.

  • times (int) – Number of times the function will be invoked.

Returns:

The wrapped function.

Return type:

Callable

retry#

Retry decorator with exponential backoff for handling exceptions.

core_mixins.decorators.retry.retry(fcn: ~typing.Callable | None = None, tries: int = 3, delay: int = 1, backoff: int = 2, exceptions: ~typing.Tuple[~typing.Type[BaseException], ...] = (<class 'Exception'>,), logger: ~logging.Logger | None = None) Callable[source]#

It retries the decorated function using an exponential backoff in case of errors (exceptions) in the execution…

Parameters:
  • fcn (Callable) – The function being decorated.

  • tries (int) – Number of retries.

  • delay (int) – Delay in seconds between invocations.

  • backoff (int) – Exponential backoff to used between retries.

  • exceptions (Callable) – Exceptions to capture for the retries. If the raised exception is not here, no retries are performed.

  • logger (Callable) – The function being decorated.

Returns:

The wrapped function.

Return type:

Callable

singleton#

Singleton pattern decorator for ensuring single instance classes.

core_mixins.decorators.singleton.singleton(cls: Type) Callable[source]#

Make a class a Singleton class (only one instance)

Parameters:

cls (Type) – The class being decorated.

Returns:

The wrapped function.

Return type:

Callable

with_timeout#

This decorator ensures a function does not run for more than a certain time, otherwise an error is raised.

core_mixins.decorators.timeout.with_timeout(fcn: Callable | None = None, timeout: float = 10) Callable[source]#

It executes a function that will time out after the specified value.

Important: Use with caution when combining with asyncio because the decorator spawns a subprocess and uses multiprocessing.Queue to pass data. When combined with functions like run_in_executor, you could face issues if you are working with large files due:

  • Pickling large data.

  • Fork in async context.

  • Double overhead.

Alternative

loop = asyncio.get_event_loop()
try:
    return await asyncio.wait_for(
        loop.run_in_executor(None, the_function),
        timeout=timeout
    )

except asyncio.TimeoutError:
    raise TimeoutError(...)
Parameters:
  • fcn – The function being decorated.

  • timeout – The seconds before time out.

Returns:

The wrapped function.

with_timeout_signal#

This decorator (alternative to with_timeout) ensures a function does not run for more than a certain amount of seconds before an error (TimeoutError) is raised.

core_mixins.decorators.timeout_signal.with_timeout_signal(fcn: Callable | None = None, timeout: float = 10) Callable[source]#

It executes a function that will time out after the specified amount of seconds using signal. Could be an alternative for with_timeout because does not require spanning another process or the use of a queue.

Caveats:
  • Only works on Unix/Linux systems (SIGALRM not available on Windows).

  • Must be called from the main thread (signal.alarm limitation).

  • Not re-entrant: nested or concurrent decorated functions will interfere.

  • Not compatible with async functions or async contexts.

  • May conflict with other code using SIGALRM.

Parameters:
  • fcn – The function being decorated.

  • timeout – The seconds before time out.

Returns:

The wrapped function.

Raises:
  • TimeoutError – If the decorated function exceeds the timeout duration.

  • RuntimeError – If used on Windows or from a non-main thread.

timer#

Timer decorator for measuring function execution time.

core_mixins.decorators.timer.timer(fcn: Callable | None = None) Callable[source]#

Using this decorator you will get a tuple in the form of: result, execution_time…

Parameters:

fcn (Callable) – The function being decorated.

Returns:

The wrapped function.

Return type:

Callable