Source code for core_mixins.managers.tasks_manager
# -*- coding: utf-8 -*-
"""
Task manager for executing and coordinating multiple tasks
with optional parallelization.
"""
from __future__ import annotations
from multiprocessing.pool import Pool
from typing import List
from typing import Optional
from core_mixins.decorators import timer
from core_mixins.interfaces.task import ITask
from core_mixins.interfaces.task import TaskResult
from core_mixins.interfaces.task import TaskStatus
[docs]
class TasksManager:
"""It manages the execution for the registered tasks"""
[docs]
def __init__(self, tasks: List[ITask]):
self.tasks = tasks
[docs]
def execute(
self,
task_name: Optional[str] = None,
parallelize: Optional[bool] = False,
processes: Optional[int] = None,
) -> Optional[List[TaskResult] | TaskResult]:
"""
Execute all registered tasks. An exception in one task should not
stop the execution of the others...
Example of results:
.. code-block:: python
[
TaskResult(status=TaskStatus.SUCCESS, result=...),
TaskResult(status=TaskStatus.ERROR, error=...)
]
..
:param task_name: If defined, only that specific task will be executed.
:type task_name: str
:param parallelize: It defines if you want to execute the tasks in parallel.
:type parallelize: bool
:param processes: Number of parallel process.
:type processes: int
:return: The list of the execution results.
:rtype: List[TaskResult]
"""
if not self.tasks:
return None
if task_name:
for task in self.tasks:
if task_name == task.name:
return self._execute(task)
raise ValueError(f"Task [{task_name}] is not registered!")
res = []
if not parallelize:
for task in self.tasks:
res.append(self._execute(task))
else:
with Pool(processes=processes) as pool:
res = pool.map(self._execute, self.tasks)
return res
@staticmethod
def _execute(task: ITask) -> TaskResult:
_execution_time: float = 0.0
_result = _error = None
try:
task.status = TaskStatus.EXECUTING
@timer
def _run_task():
nonlocal _result
nonlocal _error
try:
_result = task.execute()
except Exception as e: # pylint: disable=broad-exception-caught
_error = e
_, _execution_time = _run_task() # pylint: disable=unpacking-non-sequence
if _error:
raise _error
task.status = TaskStatus.SUCCESS
return TaskResult(
status=TaskStatus.SUCCESS,
execution_time=_execution_time,
result=_result,
)
except Exception as error: # pylint: disable=broad-exception-caught
task.status = TaskStatus.ERROR
return TaskResult(
status=TaskStatus.ERROR,
execution_time=_execution_time,
error=error,
)