Source code for core_mixins.utils

# -*- coding: utf-8 -*-

"""
Common utility functions for data manipulation, string operations,
and type conversions.
"""

from __future__ import annotations

import itertools
import re
from secrets import choice
from string import ascii_letters
from string import digits
from string import punctuation
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import TypeVar

_T = TypeVar("_T")


[docs] def random_string( vocabulary: Optional[str] = None, length: int = 20, exclude: Optional[str] = None, ) -> str: """Create random string""" if not vocabulary: vocabulary = ascii_letters + digits + punctuation if exclude and not set(vocabulary) - set(exclude): raise ValueError( "No characters available: 'exclude' contains " "every character in 'vocabulary'." ) choices_: List[str] = [] while len(choices_) < length: char = choice(vocabulary) if not exclude or (exclude and char not in exclude): choices_.append(char) return "".join(choices_)
[docs] def get_batches(it: Iterable[_T], n: int) -> Iterator[List[_T]]: """ Divide an iterable into chunks of size n. :param it: Iterable to divide. :param n: Number of elements by chunk. :return: """ iterator = iter(it) while True: batch = list(itertools.islice(iterator, n)) if not batch: break yield batch
[docs] def remove_attributes(record: Dict, attrs: List[str]) -> None: """Remove attributes from a Dict""" for key in attrs: if key in record: del record[key]
[docs] def rename_attributes(record: Dict, mapper: Dict[str, str]) -> None: """Rename the object attributes using the column_mapper""" _items = list(record.items()) for key, value in _items: if key in mapper: record[mapper[key]] = value del record[key]
[docs] def add_attributes(record: Dict, expected_attrs: List[str]): """ The function add to the dictionary the list of attributes (if not exists) expected using None... :param record: Dictionary to update. :param expected_attrs: Attributes to add. :return: """ for key in expected_attrs: if key not in record: record[key] = None
[docs] def convert_data_type( data: Dict[str, str], columns_type_mapper: Dict[str, Any], ) -> None: """ Update (cast) the value depending on the specified type... :param data: Dictionary to update. :param columns_type_mapper: Dictionary that specify the data types. """ if not columns_type_mapper: return def _to_bool(value): if isinstance(value, str): if value.lower() in ("true", "1"): return True if value.lower() in ("false", "0"): return False raise ValueError(f"Cannot convert {value!r} to bool") return bool(value) # Safe type mapping type_converters: Dict[str, Callable[[Any], Any]] = { "str": str, "int": int, "float": float, "bool": _to_bool, "list": list, "dict": dict, } for key, type_name in columns_type_mapper.items(): if key in data and data[key] is not None: if type_name not in type_converters: raise ValueError(f"Unsupported type: {type_name}") value = data[key].strip() if isinstance(data[key], str) else data[key] try: converter = type_converters[type_name] data[key] = converter(value) except Exception as e: raise e.__class__(f"Cannot convert {value} to {type_name}: {e}") from e
[docs] def to_snake_case(string: str) -> str: """It converts a string from camel-case to snake-case""" # Insert underscore between an acronym and a following # titlecase word e.g. HTTPSRequest → HTTPS_Request, then # between lowercase and uppercase # e.g. getHTTPS → get_HTTPS string = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", string) string = re.sub(r"([a-z\d])([A-Z])", r"\1_\2", string) return string.lower()
[docs] def flatten_json(data: Dict, flatten_sublist: bool = False): """ Utility function for flattening dictionary objects... :param data: Object to flatten. :param flatten_sublist: Set as True if you want to flatten sublist objects. :return: Flatten data (dictionary). """ res = {} def flatten(x, name: str = ""): if isinstance(x, dict): for a in x: flatten(x[a], name + a + "_") elif flatten_sublist and isinstance(x, list): i = 0 for a in x: flatten(a, name + str(i) + "_") i += 1 else: res[name[:-1]] = x flatten(data) return res
[docs] def to_one_line(multiline_string: str) -> str: """It converts a multiline string to a single line one""" return re.sub( pattern=r"\s+", repl=" ", string=multiline_string.replace("\n", " ") ).strip()
[docs] def bytes_to_str(data: List | Dict | bytes, encoding: str = "utf-8"): """ Convert bytes to str in keys and values... Example: {b"key": b"value"}) -> {"key": "value"} {b"key": [1, 2, b"3"]} -> {"key": [1, 2, "3"]} [1, 2, b"3", {b"a": b"a"}] -> [1, 2, "3", {"a": "a"}] b"test") -> "test" """ def convert(obj_, encoding_): if isinstance(obj_, dict): dic_ = {} for key in obj_: previous_key = key if isinstance(key, bytes): key = key.decode() dic_[key] = convert(obj_[previous_key], encoding_) return dic_ if isinstance(obj_, list): return [convert(x, encoding_) for x in obj_] return obj_.decode() if isinstance(obj_, bytes) else obj_ return convert(data, encoding)