# -*- coding: utf-8 -*-
"""
Common utility functions for data manipulation, string operations,
and type conversions.
"""
from __future__ import annotations
import itertools
import re
from secrets import choice
from string import ascii_letters
from string import digits
from string import punctuation
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import TypeVar
_T = TypeVar("_T")
[docs]
def random_string(
vocabulary: Optional[str] = None,
length: int = 20,
exclude: Optional[str] = None,
) -> str:
"""Create random string"""
if not vocabulary:
vocabulary = ascii_letters + digits + punctuation
if exclude and not set(vocabulary) - set(exclude):
raise ValueError(
"No characters available: 'exclude' contains "
"every character in 'vocabulary'."
)
choices_: List[str] = []
while len(choices_) < length:
char = choice(vocabulary)
if not exclude or (exclude and char not in exclude):
choices_.append(char)
return "".join(choices_)
[docs]
def get_batches(it: Iterable[_T], n: int) -> Iterator[List[_T]]:
"""
Divide an iterable into chunks of size n.
:param it: Iterable to divide.
:param n: Number of elements by chunk.
:return:
"""
iterator = iter(it)
while True:
batch = list(itertools.islice(iterator, n))
if not batch:
break
yield batch
[docs]
def remove_attributes(record: Dict, attrs: List[str]) -> None:
"""Remove attributes from a Dict"""
for key in attrs:
if key in record:
del record[key]
[docs]
def rename_attributes(record: Dict, mapper: Dict[str, str]) -> None:
"""Rename the object attributes using the column_mapper"""
_items = list(record.items())
for key, value in _items:
if key in mapper:
record[mapper[key]] = value
del record[key]
[docs]
def add_attributes(record: Dict, expected_attrs: List[str]):
"""
The function add to the dictionary the list of
attributes (if not exists) expected
using None...
:param record: Dictionary to update.
:param expected_attrs: Attributes to add.
:return:
"""
for key in expected_attrs:
if key not in record:
record[key] = None
[docs]
def convert_data_type(
data: Dict[str, str],
columns_type_mapper: Dict[str, Any],
) -> None:
"""
Update (cast) the value depending on the specified type...
:param data: Dictionary to update.
:param columns_type_mapper: Dictionary that specify the data types.
"""
if not columns_type_mapper:
return
def _to_bool(value):
if isinstance(value, str):
if value.lower() in ("true", "1"):
return True
if value.lower() in ("false", "0"):
return False
raise ValueError(f"Cannot convert {value!r} to bool")
return bool(value)
# Safe type mapping
type_converters: Dict[str, Callable[[Any], Any]] = {
"str": str,
"int": int,
"float": float,
"bool": _to_bool,
"list": list,
"dict": dict,
}
for key, type_name in columns_type_mapper.items():
if key in data and data[key] is not None:
if type_name not in type_converters:
raise ValueError(f"Unsupported type: {type_name}")
value = data[key].strip() if isinstance(data[key], str) else data[key]
try:
converter = type_converters[type_name]
data[key] = converter(value)
except Exception as e:
raise e.__class__(f"Cannot convert {value} to {type_name}: {e}") from e
[docs]
def to_snake_case(string: str) -> str:
"""It converts a string from camel-case to snake-case"""
# Insert underscore between an acronym and a following
# titlecase word e.g. HTTPSRequest → HTTPS_Request, then
# between lowercase and uppercase
# e.g. getHTTPS → get_HTTPS
string = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", string)
string = re.sub(r"([a-z\d])([A-Z])", r"\1_\2", string)
return string.lower()
[docs]
def flatten_json(data: Dict, flatten_sublist: bool = False):
"""
Utility function for flattening dictionary objects...
:param data: Object to flatten.
:param flatten_sublist: Set as True if you want to flatten sublist objects.
:return: Flatten data (dictionary).
"""
res = {}
def flatten(x, name: str = ""):
if isinstance(x, dict):
for a in x:
flatten(x[a], name + a + "_")
elif flatten_sublist and isinstance(x, list):
i = 0
for a in x:
flatten(a, name + str(i) + "_")
i += 1
else:
res[name[:-1]] = x
flatten(data)
return res
[docs]
def to_one_line(multiline_string: str) -> str:
"""It converts a multiline string to a single line one"""
return re.sub(
pattern=r"\s+", repl=" ", string=multiline_string.replace("\n", " ")
).strip()
[docs]
def bytes_to_str(data: List | Dict | bytes, encoding: str = "utf-8"):
"""
Convert bytes to str in keys and values...
Example:
{b"key": b"value"}) -> {"key": "value"}
{b"key": [1, 2, b"3"]} -> {"key": [1, 2, "3"]}
[1, 2, b"3", {b"a": b"a"}] -> [1, 2, "3", {"a": "a"}]
b"test") -> "test"
"""
def convert(obj_, encoding_):
if isinstance(obj_, dict):
dic_ = {}
for key in obj_:
previous_key = key
if isinstance(key, bytes):
key = key.decode()
dic_[key] = convert(obj_[previous_key], encoding_)
return dic_
if isinstance(obj_, list):
return [convert(x, encoding_) for x in obj_]
return obj_.decode() if isinstance(obj_, bytes) else obj_
return convert(data, encoding)