Spaces:
Running
Running
import importlib | |
import logging | |
import unicodedata | |
from codecs import IncrementalDecoder | |
from encodings.aliases import aliases | |
from functools import lru_cache | |
from re import findall | |
from typing import Generator, List, Optional, Set, Tuple, Union | |
from _multibytecodec import MultibyteIncrementalDecoder | |
from .constant import ( | |
ENCODING_MARKS, | |
IANA_SUPPORTED_SIMILAR, | |
RE_POSSIBLE_ENCODING_INDICATION, | |
UNICODE_RANGES_COMBINED, | |
UNICODE_SECONDARY_RANGE_KEYWORD, | |
UTF8_MAXIMAL_ALLOCATION, | |
) | |
def is_accentuated(character: str) -> bool: | |
try: | |
description: str = unicodedata.name(character) | |
except ValueError: | |
return False | |
return ( | |
"WITH GRAVE" in description | |
or "WITH ACUTE" in description | |
or "WITH CEDILLA" in description | |
or "WITH DIAERESIS" in description | |
or "WITH CIRCUMFLEX" in description | |
or "WITH TILDE" in description | |
or "WITH MACRON" in description | |
or "WITH RING ABOVE" in description | |
) | |
def remove_accent(character: str) -> str: | |
decomposed: str = unicodedata.decomposition(character) | |
if not decomposed: | |
return character | |
codes: List[str] = decomposed.split(" ") | |
return chr(int(codes[0], 16)) | |
def unicode_range(character: str) -> Optional[str]: | |
""" | |
Retrieve the Unicode range official name from a single character. | |
""" | |
character_ord: int = ord(character) | |
for range_name, ord_range in UNICODE_RANGES_COMBINED.items(): | |
if character_ord in ord_range: | |
return range_name | |
return None | |
def is_latin(character: str) -> bool: | |
try: | |
description: str = unicodedata.name(character) | |
except ValueError: | |
return False | |
return "LATIN" in description | |
def is_punctuation(character: str) -> bool: | |
character_category: str = unicodedata.category(character) | |
if "P" in character_category: | |
return True | |
character_range: Optional[str] = unicode_range(character) | |
if character_range is None: | |
return False | |
return "Punctuation" in character_range | |
def is_symbol(character: str) -> bool: | |
character_category: str = unicodedata.category(character) | |
if "S" in character_category or "N" in character_category: | |
return True | |
character_range: Optional[str] = unicode_range(character) | |
if character_range is None: | |
return False | |
return "Forms" in character_range and character_category != "Lo" | |
def is_emoticon(character: str) -> bool: | |
character_range: Optional[str] = unicode_range(character) | |
if character_range is None: | |
return False | |
return "Emoticons" in character_range or "Pictographs" in character_range | |
def is_separator(character: str) -> bool: | |
if character.isspace() or character in {"|", "+", "<", ">"}: | |
return True | |
character_category: str = unicodedata.category(character) | |
return "Z" in character_category or character_category in {"Po", "Pd", "Pc"} | |
def is_case_variable(character: str) -> bool: | |
return character.islower() != character.isupper() | |
def is_cjk(character: str) -> bool: | |
try: | |
character_name = unicodedata.name(character) | |
except ValueError: | |
return False | |
return "CJK" in character_name | |
def is_hiragana(character: str) -> bool: | |
try: | |
character_name = unicodedata.name(character) | |
except ValueError: | |
return False | |
return "HIRAGANA" in character_name | |
def is_katakana(character: str) -> bool: | |
try: | |
character_name = unicodedata.name(character) | |
except ValueError: | |
return False | |
return "KATAKANA" in character_name | |
def is_hangul(character: str) -> bool: | |
try: | |
character_name = unicodedata.name(character) | |
except ValueError: | |
return False | |
return "HANGUL" in character_name | |
def is_thai(character: str) -> bool: | |
try: | |
character_name = unicodedata.name(character) | |
except ValueError: | |
return False | |
return "THAI" in character_name | |
def is_arabic(character: str) -> bool: | |
try: | |
character_name = unicodedata.name(character) | |
except ValueError: | |
return False | |
return "ARABIC" in character_name | |
def is_arabic_isolated_form(character: str) -> bool: | |
try: | |
character_name = unicodedata.name(character) | |
except ValueError: | |
return False | |
return "ARABIC" in character_name and "ISOLATED FORM" in character_name | |
def is_unicode_range_secondary(range_name: str) -> bool: | |
return any(keyword in range_name for keyword in UNICODE_SECONDARY_RANGE_KEYWORD) | |
def is_unprintable(character: str) -> bool: | |
return ( | |
character.isspace() is False # includes \n \t \r \v | |
and character.isprintable() is False | |
and character != "\x1A" # Why? Its the ASCII substitute character. | |
and character != "\ufeff" # bug discovered in Python, | |
# Zero Width No-Break Space located in Arabic Presentation Forms-B, Unicode 1.1 not acknowledged as space. | |
) | |
def any_specified_encoding(sequence: bytes, search_zone: int = 8192) -> Optional[str]: | |
""" | |
Extract using ASCII-only decoder any specified encoding in the first n-bytes. | |
""" | |
if not isinstance(sequence, bytes): | |
raise TypeError | |
seq_len: int = len(sequence) | |
results: List[str] = findall( | |
RE_POSSIBLE_ENCODING_INDICATION, | |
sequence[: min(seq_len, search_zone)].decode("ascii", errors="ignore"), | |
) | |
if len(results) == 0: | |
return None | |
for specified_encoding in results: | |
specified_encoding = specified_encoding.lower().replace("-", "_") | |
encoding_alias: str | |
encoding_iana: str | |
for encoding_alias, encoding_iana in aliases.items(): | |
if encoding_alias == specified_encoding: | |
return encoding_iana | |
if encoding_iana == specified_encoding: | |
return encoding_iana | |
return None | |
def is_multi_byte_encoding(name: str) -> bool: | |
""" | |
Verify is a specific encoding is a multi byte one based on it IANA name | |
""" | |
return name in { | |
"utf_8", | |
"utf_8_sig", | |
"utf_16", | |
"utf_16_be", | |
"utf_16_le", | |
"utf_32", | |
"utf_32_le", | |
"utf_32_be", | |
"utf_7", | |
} or issubclass( | |
importlib.import_module("encodings.{}".format(name)).IncrementalDecoder, | |
MultibyteIncrementalDecoder, | |
) | |
def identify_sig_or_bom(sequence: bytes) -> Tuple[Optional[str], bytes]: | |
""" | |
Identify and extract SIG/BOM in given sequence. | |
""" | |
for iana_encoding in ENCODING_MARKS: | |
marks: Union[bytes, List[bytes]] = ENCODING_MARKS[iana_encoding] | |
if isinstance(marks, bytes): | |
marks = [marks] | |
for mark in marks: | |
if sequence.startswith(mark): | |
return iana_encoding, mark | |
return None, b"" | |
def should_strip_sig_or_bom(iana_encoding: str) -> bool: | |
return iana_encoding not in {"utf_16", "utf_32"} | |
def iana_name(cp_name: str, strict: bool = True) -> str: | |
cp_name = cp_name.lower().replace("-", "_") | |
encoding_alias: str | |
encoding_iana: str | |
for encoding_alias, encoding_iana in aliases.items(): | |
if cp_name in [encoding_alias, encoding_iana]: | |
return encoding_iana | |
if strict: | |
raise ValueError("Unable to retrieve IANA for '{}'".format(cp_name)) | |
return cp_name | |
def range_scan(decoded_sequence: str) -> List[str]: | |
ranges: Set[str] = set() | |
for character in decoded_sequence: | |
character_range: Optional[str] = unicode_range(character) | |
if character_range is None: | |
continue | |
ranges.add(character_range) | |
return list(ranges) | |
def cp_similarity(iana_name_a: str, iana_name_b: str) -> float: | |
if is_multi_byte_encoding(iana_name_a) or is_multi_byte_encoding(iana_name_b): | |
return 0.0 | |
decoder_a = importlib.import_module( | |
"encodings.{}".format(iana_name_a) | |
).IncrementalDecoder | |
decoder_b = importlib.import_module( | |
"encodings.{}".format(iana_name_b) | |
).IncrementalDecoder | |
id_a: IncrementalDecoder = decoder_a(errors="ignore") | |
id_b: IncrementalDecoder = decoder_b(errors="ignore") | |
character_match_count: int = 0 | |
for i in range(255): | |
to_be_decoded: bytes = bytes([i]) | |
if id_a.decode(to_be_decoded) == id_b.decode(to_be_decoded): | |
character_match_count += 1 | |
return character_match_count / 254 | |
def is_cp_similar(iana_name_a: str, iana_name_b: str) -> bool: | |
""" | |
Determine if two code page are at least 80% similar. IANA_SUPPORTED_SIMILAR dict was generated using | |
the function cp_similarity. | |
""" | |
return ( | |
iana_name_a in IANA_SUPPORTED_SIMILAR | |
and iana_name_b in IANA_SUPPORTED_SIMILAR[iana_name_a] | |
) | |
def set_logging_handler( | |
name: str = "charset_normalizer", | |
level: int = logging.INFO, | |
format_string: str = "%(asctime)s | %(levelname)s | %(message)s", | |
) -> None: | |
logger = logging.getLogger(name) | |
logger.setLevel(level) | |
handler = logging.StreamHandler() | |
handler.setFormatter(logging.Formatter(format_string)) | |
logger.addHandler(handler) | |
def cut_sequence_chunks( | |
sequences: bytes, | |
encoding_iana: str, | |
offsets: range, | |
chunk_size: int, | |
bom_or_sig_available: bool, | |
strip_sig_or_bom: bool, | |
sig_payload: bytes, | |
is_multi_byte_decoder: bool, | |
decoded_payload: Optional[str] = None, | |
) -> Generator[str, None, None]: | |
if decoded_payload and is_multi_byte_decoder is False: | |
for i in offsets: | |
chunk = decoded_payload[i : i + chunk_size] | |
if not chunk: | |
break | |
yield chunk | |
else: | |
for i in offsets: | |
chunk_end = i + chunk_size | |
if chunk_end > len(sequences) + 8: | |
continue | |
cut_sequence = sequences[i : i + chunk_size] | |
if bom_or_sig_available and strip_sig_or_bom is False: | |
cut_sequence = sig_payload + cut_sequence | |
chunk = cut_sequence.decode( | |
encoding_iana, | |
errors="ignore" if is_multi_byte_decoder else "strict", | |
) | |
# multi-byte bad cutting detector and adjustment | |
# not the cleanest way to perform that fix but clever enough for now. | |
if is_multi_byte_decoder and i > 0: | |
chunk_partial_size_chk: int = min(chunk_size, 16) | |
if ( | |
decoded_payload | |
and chunk[:chunk_partial_size_chk] not in decoded_payload | |
): | |
for j in range(i, i - 4, -1): | |
cut_sequence = sequences[j:chunk_end] | |
if bom_or_sig_available and strip_sig_or_bom is False: | |
cut_sequence = sig_payload + cut_sequence | |
chunk = cut_sequence.decode(encoding_iana, errors="ignore") | |
if chunk[:chunk_partial_size_chk] in decoded_payload: | |
break | |
yield chunk | |