Source code for core_mixins.interfaces.task

# -*- coding: utf-8 -*-

"""
This module provides a base implementation for ETL tasks
under the project ecosystem.
"""

from abc import ABC, abstractmethod
from enum import Enum
from logging import Logger
from typing import Any, Optional

from core_mixins.interfaces.factory import IFactory


[docs] class TaskStatus(str, Enum): """Possible status a task can have during its life""" CREATED = "CREATED" EXECUTING = "EXECUTING" SUCCESS = "SUCCESS" ERROR = "ERROR"
[docs] class ITask(IFactory, ABC): """Base implementations for different tasks/processes"""
[docs] def __init__( self, name: Optional[str] = None, description: Optional[str] = None, logger: Optional[Logger] = None, ) -> None: self._name = name self.description = description self._status = TaskStatus.CREATED self.logger = logger
[docs] @classmethod def registration_key(cls) -> str: """It returns the key used to register the class""" return cls.__name__
@property def name(self): """It returns the task identification""" return self._name or self.registration_key() @property def status(self) -> TaskStatus: """It returns the current status of the task""" return self._status @status.setter def status(self, status: TaskStatus) -> None: self._status = status
[docs] @abstractmethod def execute(self, *args, **kwargs) -> Any: """You must implement the task's process"""
[docs] def info(self, message, *args, **kwargs) -> None: """Log entry with severity 'INFO'""" if self.logger: self.logger.info(f"{self.name} | {message}", *args, **kwargs)
[docs] def warning(self, message, *args, **kwargs) -> None: """Log entry with severity 'WARNING'""" if self.logger: self.logger.warning(f"{self.name} | {message}", *args, **kwargs)
[docs] def error(self, error, *args, **kwargs) -> None: """Log entry with severity 'ERROR'""" if self.logger: self.logger.error(f"{self.name} | {error}", *args, **kwargs)
[docs] def debug(self, message, *args, **kwargs) -> None: """Log entry with severity 'DEBUG'""" if self.logger: self.logger.debug(f"{self.name} | {message}", *args, **kwargs)
[docs] class TaskResult: """ Represents the result of a task execution that exposes `status`, `result`, or the error. """
[docs] def __init__( self, status: TaskStatus, result: Optional[Any] = None, error: Optional[Exception] = None, execution_time: float = 0.0, ) -> None: self.status = status self.execution_time = execution_time self.result = result self.error = error
def __repr__(self) -> str: if self.status == TaskStatus.SUCCESS: return f"TaskResult(status={self.status}, result={self.result})" if self.status == TaskStatus.ERROR: return f"TaskResult(status={self.status}, error={self.error})" return f"TaskResult(status={self.status})" @property def is_success(self) -> bool: """Check if the task execution was successful""" return self.status == TaskStatus.SUCCESS @property def is_error(self) -> bool: """Check if the task execution failed""" return self.status == TaskStatus.ERROR
[docs] class TaskException(Exception): """Custom exception for Tasks"""