Decorators#
This module provides a collection of useful decorators for enhancing function and class behavior.
Quick Examples#
Cache hierarchy#
All cache decorators share a common extensible base defined in
cache_memory_based.py:
cache_memory_based.py L2Backend (interface), _CacheWrapper (L1 + optional fallback), cache_memory_based
cache_file_based.py _DiskBackend(L2Backend), cache_file_based
cache_redis.py _RedisBackend(L2Backend), ... ← future
cache_s3.py _S3Backend(L2Backend), ... ← future
_CacheWrapper is the L1 in-memory LRU layer. It accepts an
optional fallback, any L2Backend, that is consulted on
an L1 miss and written to on every new computation (write-through).
cache_memory_based() uses no fallback and is the pure in-memory
base of the hierarchy; it also acts as the degraded fallback when a
backend is unavailable (e.g. Redis unreachable). cache is a
backward-compatible alias for cache_memory_based().
TTL (time-to-live) is supported at every layer:
L1:
_CacheWrapperexpires entries lazily on the next access. No background thread is used.L2:
_DiskBackendstores a timestamp alongside each value and returns_MISSwhen the entry is older thanttl. Future Redis/S3 backends can delegate to native TTL primitives (e.g.SETEX) by overridingsave/load.
Building a new backend requires three steps:
Implement
L2Backend, callsuper().__init__(ttl=ttl)to inherit TTL storage, then implementload(cache_key)(return the value or_MISS) andsave(cache_key, value).Pass your backend as
fallbacktocache_memory_based().Wrap with a decorator function following the bare / parameterised pattern used by
cache_file_based().
# example: skeleton for a future cache_redis
import pickle
import time
from core_mixins.decorators.cache_memory_based import L2Backend, _MISS, cache_memory_based
class _RedisBackend(L2Backend):
def __init__(self, host="localhost", port=6379, ttl=None):
super().__init__(ttl=ttl)
self._redis = ... # connect
def load(self, cache_key):
raw = self._redis.get(self._key(cache_key))
return pickle.loads(raw) if raw is not None else _MISS # noqa: S301
def save(self, cache_key, value):
data = pickle.dumps(value)
if self.ttl is not None:
self._redis.setex(self._key(cache_key), int(self.ttl), data)
else:
self._redis.set(self._key(cache_key), data)
def cache_redis_based(fcn=None, *, maxsize=128, host="localhost", ttl=None):
def decorator(_fcn):
backend = _RedisBackend(host=host, ttl=ttl)
return cache_memory_based(_fcn, maxsize=maxsize, fallback=backend, ttl=ttl)
if fcn is not None:
return decorator(fcn)
return decorator
Cache Decorator#
Note
functools.lru_cache is order-sensitive for keyword arguments:
f(a=1, b=2) and f(b=2, a=1) are treated as distinct cache
entries, so the underlying function is called twice for the same
logical input. @cache / @cache_memory_based sorts kwargs
before building the key, making it order-insensitive.
If kwargs order-insensitivity is not needed, prefer
@functools.lru_cache(maxsize=N), it is implemented in C and
exposes .cache_info() / .cache_clear().
from core_mixins.decorators import cache, cache_memory_based
# Bare usage: unbounded, never expires
@cache
def expensive_computation(n):
return sum(range(n))
result = expensive_computation(1000) # Computed
result = expensive_computation(1000) # Cached
# Bounded cache: keeps the 128 most-recently used results
@cache(maxsize=128)
def fetch_record(record_id):
return db.get(record_id)
# With TTL: entry expires 5 minutes after it was stored
@cache_memory_based(maxsize=256, ttl=300)
def get_config(key):
return remote_config.fetch(key)
# With keyword arguments: order-insensitive, unlike functools.lru_cache
@cache
def query(table, *, filters=None, limit=100):
...
Performance comparison#
lru_cache is a C extension; @cache is pure Python with a
threading lock on every call. The benchmark below measures 1 million
calls on a warm cache and on an all-miss workload.
import timeit
from functools import lru_cache
from core_mixins.decorators import cache
N = 1_000_000
# positional args, warm cache
@lru_cache(maxsize=128)
def lru_pos(x, y): return x + y
@cache(maxsize=128)
def our_pos(x, y): return x + y
for i in range(100): lru_pos(i%10, i%7); our_pos(i%10, i%7)
t_lru_pos = timeit.timeit(lambda: lru_pos(3, 4), number=N)
t_our_pos = timeit.timeit(lambda: our_pos(3, 4), number=N)
# keyword args, warm cache
@lru_cache(maxsize=128)
def lru_kw(x=0, y=0): return x + y
@cache(maxsize=128)
def our_kw(x=0, y=0): return x + y
for i in range(100): lru_kw(x=i%10, y=i%7); our_kw(x=i%10, y=i%7)
t_lru_kw = timeit.timeit(lambda: lru_kw(x=3, y=4), number=N)
t_our_kw = timeit.timeit(lambda: our_kw(x=3, y=4), number=N)
# cache miss: all unique args
@lru_cache(maxsize=None)
def lru_miss(x): return x * 2
@cache(maxsize=None)
def our_miss(x): return x * 2
gen = iter(range(N))
t_lru_miss = timeit.timeit(lambda: lru_miss(next(gen)), number=N)
gen = iter(range(N))
t_our_miss = timeit.timeit(lambda: our_miss(next(gen)), number=N)
hdr = '{:<32} {:>10} {:>10} {:>8}'
row = '{:<32} {:>9.3f}s {:>9.3f}s {:>7.1f}x'
print(hdr.format('scenario (1M calls)', 'lru_cache', '@cache', 'ratio'))
print('-' * 64)
print(row.format('positional args, cache hit', t_lru_pos, t_our_pos, t_our_pos / t_lru_pos))
print(row.format('keyword args, cache hit', t_lru_kw, t_our_kw, t_our_kw / t_lru_kw))
print(row.format('cache miss (all unique)', t_lru_miss,t_our_miss,t_our_miss / t_lru_miss))
Results on CPython 3.12 (1 M calls each):
Scenario |
|
|
Ratio |
|---|---|---|---|
Positional args, cache hit |
0.053 s |
0.541 s |
10.2× slower |
Keyword args, cache hit |
0.104 s |
0.750 s |
7.2× slower |
Cache miss (all unique) |
0.127 s |
0.753 s |
5.9× slower |
lru_cache is ~6–10× faster because its hot path avoids the Python
interpreter almost entirely. @cache pays a fixed cost on every call:
acquiring a threading.Lock, building a nested tuple key, and an
OrderedDict lookup. The lock is the dominant overhead.
The trade-off: use @cache when kwargs order-insensitivity is
required; use @functools.lru_cache everywhere else.
Cache File Based Decorator#
Write-through two-level cache: L1 is in-memory LRU; the fallback is a per-function directory of pickle files on disk. Results survive process restarts and are shared across processes that point at the same path.
from core_mixins.decorators import cache_file_based
# Bare usage: default path (tempfile.gettempdir()), unbounded TTL
@cache_file_based
def expensive(n):
return n ** 2
# Custom path and memory bound
@cache_file_based(path="/var/cache/myapp", maxsize=64)
def fetch_record(record_id):
return db.get(record_id)
# With TTL: entries expire after 10 minutes at both L1 and L2
@cache_file_based(path="/var/cache/myapp", maxsize=128, ttl=600)
def get_report(report_id):
return reporting_api.fetch(report_id)
# Every new result is written to both L1 and disk immediately.
# When L1 is full the oldest entry is dropped from memory, disk file kept.
# A second process at the same path gets a cache hit without recomputing.
#
# With ttl set, expiry is symmetric across both layers:
# L1: expired entry is evicted from memory on the next access.
# L2: expired .pkl file is deleted from disk on the next access.
# No background thread, no cron job, the cache self-cleans.
Count Calls Decorator#
from core_mixins.decorators import count_calls
@count_calls
def my_function():
return "Hello"
my_function()
my_function()
print(my_function.calls_number) # Output: 2
Retry Decorator#
from core_mixins.decorators import retry
import requests
@retry(tries=3, delay=1, backoff=2)
def fetch_data(url):
response = requests.get(url)
response.raise_for_status()
return response.json()
Singleton Decorator#
from core_mixins.decorators import singleton
@singleton
class DatabaseConnection:
def __init__(self):
self.connection = "Connected"
db1 = DatabaseConnection()
db2 = DatabaseConnection()
assert db1 is db2 # Same instance
With Time Out Decorator#
from core_mixins.decorators import with_timeout
@with_timeout(timeout=2)
def long_task_that_times_out():
"""Example function that will timeout."""
time.sleep(5)
return "Task completed"
@with_timeout(timeout=5)
def long_task_that_succeeds():
"""Example function that completes before timeout."""
time.sleep(2)
return "Task completed successfully"
if __name__ == "__main__":
# Test 1: Function that completes successfully
try:
result = long_task_that_succeeds()
print(f"Success: {result}")
except TimeoutError as e:
print(f"Timeout: {e}")
# Test 2: Function that times out
try:
result = long_task_that_times_out()
print(f"Success: {result}")
except TimeoutError as e:
print(f"Timeout: {e}")
Important:
Use with caution when combining with asyncio because the decorator spawns a subprocess and uses inter-process communication to pass data. When combined with functions like run_in_executor, you could face issues if you are working with large files due:
Pickling large data.
Fork in async context.
Double overhead.
Alternative
loop = asyncio.get_event_loop()
try:
return await asyncio.wait_for(
loop.run_in_executor(None, the_function),
timeout=timeout
)
except asyncio.TimeoutError:
raise TimeoutError(...)
Time Out Decorator (Signal/Linux)#
from core_mixins.decorators import with_timeout_signal
@with_timeout_signal(timeout=2)
def long_task_that_times_out():
return "Task completed"
Timer Decorator#
from core_mixins.decorators import timer
@timer
def slow_function():
import time
time.sleep(1)
return "Done"
result, elapsed = slow_function()
print(f"Took {elapsed:.2f} seconds")
API Reference#
SyncWrapper#
Async wrapper for executing async code from synchronous context.
- class core_mixins.decorators.async_.SyncWrapper(async_instance: T)[source]#
Bases:
Generic[T]It provides an interface (mechanism) to execute async code from sync apps via a background thread that keeps a single event loop alive, avoiding asyncio.run() which creates and destroys an event loop each time and becomes expensive for many method calls.
class Test: def sync_method(self) -> str: return self.__class__.__name__ async def testing(self) -> bool: await sleep(0) return True sync_instance = SyncWrapper(instance) assert Test.__name__, sync_instance.sync_method() assert sync_instance.testing(), True sync_instance.close()
cache_memory_based#
Caching decorators: pure in-memory (cache) and
extensible fallback base.
- class core_mixins.decorators.cache_memory_based.L2Backend(ttl: float | None = None)[source]#
Bases:
ABCInterface for pluggable cache fallback backends. Implement this to add a persistence layer (disk, S3, Redis, DynamoDB or any other engine) to
_CacheWrappervia itsfallbackparameter.
- core_mixins.decorators.cache_memory_based.cache_memory_based(fcn: Callable | None = None, *, maxsize: int | None = None, fallback: L2Backend | None = None, ttl: float | None = None) Callable[source]#
Pure in-memory memoising decorator; the no-fallback base of the cache hierarchy.
Subclasses add persistence by passing a
L2Backendto_CacheWrapper:cache_file_based()uses disk, future variants will use S3 or Redis. All of them fall back to this in-memory behavior when their backend is unavailable.functools.lru_cache()is order-sensitive for keyword arguments:f(a=1, b=2)andf(b=2, a=1)are treated as distinct cache entries and the underlying function is called twice. This decorator normalises kwargs by sorting them before building the key, so both calls map to the same entry.If kwargs order-insensitivity is not needed, prefer
@functools.lru_cache(maxsize=N), it is implemented in C and exposes.cache_info()/.cache_clear(). Use this decorator when you need to memoize functions whose callers may pass the same keyword arguments in different orders.- Parameters:
fcn – The function being decorated.
maxsize – Maximum number of entries to keep. When the limit is reached the least-recently-used entry is evicted.
None(default) means the cache is unbounded.fallback – Optional
L2Backendconsulted on an L1 miss and written to on every new computation.None(default) gives pure in-memory behavior identical tocache_memory_based().ttl – Time-to-live in seconds for L1 entries. Expired entries are evicted lazily on the next access.
None(default) means entries never expire.
- Returns:
The wrapped function.
- Return type:
Callable
cache_file_based#
Note
Serialization format (pickle)
Cached values are serialized with Python’s pickle module. Bandit
flags pickle by default (rules B403 and B301) because deserializing
data from an untrusted source can execute arbitrary code.
That risk does not apply here: the only data ever passed to
pickle.loads() is data that this library previously wrote with
pickle.dumps(). There is no user-controlled input anywhere in the
deserialization path, so the Bandit warnings are suppressed with
# nosec B403 / # nosec B301.
Do not point this decorator at a directory that is writable by untrusted parties.
Persistent file-based caching decorator.
- core_mixins.decorators.cache_file_based.cache_file_based(fcn: Callable | None = None, *, path: str | None = None, maxsize: int = 128, ttl: float | None = None) Callable[source]#
Write-through caching decorator: L1 is a bounded in-memory LRU (
_CacheWrapper); the fallback is a directory of per-entry pickle files on disk (_DiskBackend).Every new result is written to both L1 and disk immediately. When L1 is full the least-recently-used entry is evicted from memory only — its disk file is kept. A subsequent call with the same arguments reloads the value from disk without invoking the wrapped function.
Because results are always on disk, multiple processes sharing the same
pathbenefit from each other’s computed results.Supports both bare and parameterized call styles:
@cache_file_based def f(x): ... @cache_file_based(path="/tmp/mycache", maxsize=64) def g(x): ...
- Parameters:
fcn – The function being decorated (set when used bare).
path – Directory under which per-function subdirectories are created. Defaults to
tempfile.gettempdir().maxsize – Maximum number of entries to keep in memory. Least-recently-used entries are evicted from memory (disk file kept) when the limit is exceeded. Defaults to
128.ttl – Time-to-live in seconds. Expiry is symmetric across both layers: expired L1 entries are evicted from memory, and expired L2 files are deleted from disk, both lazily on the next access. No background thread or external cleanup is required.
None(default) means entries never expire.
- Returns:
The wrapped function.
- Return type:
Callable
count_calls#
Decorators for counting function call invocations.
repeat#
Repeat decorator for executing functions multiple times.
- core_mixins.decorators.repeat.repeat(fcn: Callable | None = None, *, times: int = 2) Callable[source]#
Repeat n times the function and return the list of returned values…
- Parameters:
fcn (Callable) – The function being decorated.
times (int) – Number of times the function will be invoked.
- Returns:
The wrapped function.
- Return type:
Callable
retry#
Retry decorator with exponential backoff for handling exceptions.
- core_mixins.decorators.retry.retry(fcn: ~typing.Callable | None = None, tries: int = 3, delay: int = 1, backoff: int = 2, exceptions: ~typing.Tuple[~typing.Type[BaseException], ...] = (<class 'Exception'>,), logger: ~logging.Logger | None = None) Callable[source]#
It retries the decorated function using an exponential backoff in case of errors (exceptions) in the execution…
- Parameters:
fcn (Callable) – The function being decorated.
tries (int) – Number of retries.
delay (int) – Delay in seconds between invocations.
backoff (int) – Exponential backoff to used between retries.
exceptions (Callable) – Exceptions to capture for the retries. If the raised exception is not here, no retries are performed.
logger (Callable) – The function being decorated.
- Returns:
The wrapped function.
- Return type:
Callable
singleton#
Singleton pattern decorator for ensuring single instance classes.
with_timeout#
This decorator ensures a function does not run for more than a certain time, otherwise an error is raised.
- core_mixins.decorators.timeout.with_timeout(fcn: Callable | None = None, timeout: float = 10) Callable[source]#
It executes a function that will time out after the specified value.
Important: Use with caution when combining with asyncio because the decorator spawns a subprocess and uses multiprocessing.Queue to pass data. When combined with functions like run_in_executor, you could face issues if you are working with large files due:
Pickling large data.
Fork in async context.
Double overhead.
Alternative
loop = asyncio.get_event_loop() try: return await asyncio.wait_for( loop.run_in_executor(None, the_function), timeout=timeout ) except asyncio.TimeoutError: raise TimeoutError(...)
- Parameters:
fcn – The function being decorated.
timeout – The seconds before time out.
- Returns:
The wrapped function.
with_timeout_signal#
This decorator (alternative to with_timeout) ensures a function does not run for more than a certain amount of seconds before an error (TimeoutError) is raised.
- core_mixins.decorators.timeout_signal.with_timeout_signal(fcn: Callable | None = None, timeout: float = 10) Callable[source]#
It executes a function that will time out after the specified amount of seconds using signal. Could be an alternative for with_timeout because does not require spanning another process or the use of a queue.
- Caveats:
Only works on Unix/Linux systems (SIGALRM not available on Windows).
Must be called from the main thread (signal.alarm limitation).
Not re-entrant: nested or concurrent decorated functions will interfere.
Not compatible with async functions or async contexts.
May conflict with other code using SIGALRM.
- Parameters:
fcn – The function being decorated.
timeout – The seconds before time out.
- Returns:
The wrapped function.
- Raises:
TimeoutError – If the decorated function exceeds the timeout duration.
RuntimeError – If used on Windows or from a non-main thread.
timer#
Timer decorator for measuring function execution time.