Spaces:
Running
Running
import logging | |
from os import PathLike | |
from typing import BinaryIO, List, Optional, Set, Union | |
from .cd import ( | |
coherence_ratio, | |
encoding_languages, | |
mb_encoding_languages, | |
merge_coherence_ratios, | |
) | |
from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE | |
from .md import mess_ratio | |
from .models import CharsetMatch, CharsetMatches | |
from .utils import ( | |
any_specified_encoding, | |
cut_sequence_chunks, | |
iana_name, | |
identify_sig_or_bom, | |
is_cp_similar, | |
is_multi_byte_encoding, | |
should_strip_sig_or_bom, | |
) | |
# Will most likely be controversial | |
# logging.addLevelName(TRACE, "TRACE") | |
logger = logging.getLogger("charset_normalizer") | |
explain_handler = logging.StreamHandler() | |
explain_handler.setFormatter( | |
logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") | |
) | |
def from_bytes( | |
sequences: Union[bytes, bytearray], | |
steps: int = 5, | |
chunk_size: int = 512, | |
threshold: float = 0.2, | |
cp_isolation: Optional[List[str]] = None, | |
cp_exclusion: Optional[List[str]] = None, | |
preemptive_behaviour: bool = True, | |
explain: bool = False, | |
language_threshold: float = 0.1, | |
enable_fallback: bool = True, | |
) -> CharsetMatches: | |
""" | |
Given a raw bytes sequence, return the best possibles charset usable to render str objects. | |
If there is no results, it is a strong indicator that the source is binary/not text. | |
By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence. | |
And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will. | |
The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page | |
but never take it for granted. Can improve the performance. | |
You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that | |
purpose. | |
This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32. | |
By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain' | |
toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging. | |
Custom logging format and handler can be set manually. | |
""" | |
if not isinstance(sequences, (bytearray, bytes)): | |
raise TypeError( | |
"Expected object of type bytes or bytearray, got: {0}".format( | |
type(sequences) | |
) | |
) | |
if explain: | |
previous_logger_level: int = logger.level | |
logger.addHandler(explain_handler) | |
logger.setLevel(TRACE) | |
length: int = len(sequences) | |
if length == 0: | |
logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.") | |
if explain: | |
logger.removeHandler(explain_handler) | |
logger.setLevel(previous_logger_level or logging.WARNING) | |
return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")]) | |
if cp_isolation is not None: | |
logger.log( | |
TRACE, | |
"cp_isolation is set. use this flag for debugging purpose. " | |
"limited list of encoding allowed : %s.", | |
", ".join(cp_isolation), | |
) | |
cp_isolation = [iana_name(cp, False) for cp in cp_isolation] | |
else: | |
cp_isolation = [] | |
if cp_exclusion is not None: | |
logger.log( | |
TRACE, | |
"cp_exclusion is set. use this flag for debugging purpose. " | |
"limited list of encoding excluded : %s.", | |
", ".join(cp_exclusion), | |
) | |
cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion] | |
else: | |
cp_exclusion = [] | |
if length <= (chunk_size * steps): | |
logger.log( | |
TRACE, | |
"override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.", | |
steps, | |
chunk_size, | |
length, | |
) | |
steps = 1 | |
chunk_size = length | |
if steps > 1 and length / steps < chunk_size: | |
chunk_size = int(length / steps) | |
is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE | |
is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE | |
if is_too_small_sequence: | |
logger.log( | |
TRACE, | |
"Trying to detect encoding from a tiny portion of ({}) byte(s).".format( | |
length | |
), | |
) | |
elif is_too_large_sequence: | |
logger.log( | |
TRACE, | |
"Using lazy str decoding because the payload is quite large, ({}) byte(s).".format( | |
length | |
), | |
) | |
prioritized_encodings: List[str] = [] | |
specified_encoding: Optional[str] = ( | |
any_specified_encoding(sequences) if preemptive_behaviour else None | |
) | |
if specified_encoding is not None: | |
prioritized_encodings.append(specified_encoding) | |
logger.log( | |
TRACE, | |
"Detected declarative mark in sequence. Priority +1 given for %s.", | |
specified_encoding, | |
) | |
tested: Set[str] = set() | |
tested_but_hard_failure: List[str] = [] | |
tested_but_soft_failure: List[str] = [] | |
fallback_ascii: Optional[CharsetMatch] = None | |
fallback_u8: Optional[CharsetMatch] = None | |
fallback_specified: Optional[CharsetMatch] = None | |
results: CharsetMatches = CharsetMatches() | |
sig_encoding, sig_payload = identify_sig_or_bom(sequences) | |
if sig_encoding is not None: | |
prioritized_encodings.append(sig_encoding) | |
logger.log( | |
TRACE, | |
"Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.", | |
len(sig_payload), | |
sig_encoding, | |
) | |
prioritized_encodings.append("ascii") | |
if "utf_8" not in prioritized_encodings: | |
prioritized_encodings.append("utf_8") | |
for encoding_iana in prioritized_encodings + IANA_SUPPORTED: | |
if cp_isolation and encoding_iana not in cp_isolation: | |
continue | |
if cp_exclusion and encoding_iana in cp_exclusion: | |
continue | |
if encoding_iana in tested: | |
continue | |
tested.add(encoding_iana) | |
decoded_payload: Optional[str] = None | |
bom_or_sig_available: bool = sig_encoding == encoding_iana | |
strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom( | |
encoding_iana | |
) | |
if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available: | |
logger.log( | |
TRACE, | |
"Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", | |
encoding_iana, | |
) | |
continue | |
if encoding_iana in {"utf_7"} and not bom_or_sig_available: | |
logger.log( | |
TRACE, | |
"Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.", | |
encoding_iana, | |
) | |
continue | |
try: | |
is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana) | |
except (ModuleNotFoundError, ImportError): | |
logger.log( | |
TRACE, | |
"Encoding %s does not provide an IncrementalDecoder", | |
encoding_iana, | |
) | |
continue | |
try: | |
if is_too_large_sequence and is_multi_byte_decoder is False: | |
str( | |
sequences[: int(50e4)] | |
if strip_sig_or_bom is False | |
else sequences[len(sig_payload) : int(50e4)], | |
encoding=encoding_iana, | |
) | |
else: | |
decoded_payload = str( | |
sequences | |
if strip_sig_or_bom is False | |
else sequences[len(sig_payload) :], | |
encoding=encoding_iana, | |
) | |
except (UnicodeDecodeError, LookupError) as e: | |
if not isinstance(e, LookupError): | |
logger.log( | |
TRACE, | |
"Code page %s does not fit given bytes sequence at ALL. %s", | |
encoding_iana, | |
str(e), | |
) | |
tested_but_hard_failure.append(encoding_iana) | |
continue | |
similar_soft_failure_test: bool = False | |
for encoding_soft_failed in tested_but_soft_failure: | |
if is_cp_similar(encoding_iana, encoding_soft_failed): | |
similar_soft_failure_test = True | |
break | |
if similar_soft_failure_test: | |
logger.log( | |
TRACE, | |
"%s is deemed too similar to code page %s and was consider unsuited already. Continuing!", | |
encoding_iana, | |
encoding_soft_failed, | |
) | |
continue | |
r_ = range( | |
0 if not bom_or_sig_available else len(sig_payload), | |
length, | |
int(length / steps), | |
) | |
multi_byte_bonus: bool = ( | |
is_multi_byte_decoder | |
and decoded_payload is not None | |
and len(decoded_payload) < length | |
) | |
if multi_byte_bonus: | |
logger.log( | |
TRACE, | |
"Code page %s is a multi byte encoding table and it appear that at least one character " | |
"was encoded using n-bytes.", | |
encoding_iana, | |
) | |
max_chunk_gave_up: int = int(len(r_) / 4) | |
max_chunk_gave_up = max(max_chunk_gave_up, 2) | |
early_stop_count: int = 0 | |
lazy_str_hard_failure = False | |
md_chunks: List[str] = [] | |
md_ratios = [] | |
try: | |
for chunk in cut_sequence_chunks( | |
sequences, | |
encoding_iana, | |
r_, | |
chunk_size, | |
bom_or_sig_available, | |
strip_sig_or_bom, | |
sig_payload, | |
is_multi_byte_decoder, | |
decoded_payload, | |
): | |
md_chunks.append(chunk) | |
md_ratios.append( | |
mess_ratio( | |
chunk, | |
threshold, | |
explain is True and 1 <= len(cp_isolation) <= 2, | |
) | |
) | |
if md_ratios[-1] >= threshold: | |
early_stop_count += 1 | |
if (early_stop_count >= max_chunk_gave_up) or ( | |
bom_or_sig_available and strip_sig_or_bom is False | |
): | |
break | |
except ( | |
UnicodeDecodeError | |
) as e: # Lazy str loading may have missed something there | |
logger.log( | |
TRACE, | |
"LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s", | |
encoding_iana, | |
str(e), | |
) | |
early_stop_count = max_chunk_gave_up | |
lazy_str_hard_failure = True | |
# We might want to check the sequence again with the whole content | |
# Only if initial MD tests passes | |
if ( | |
not lazy_str_hard_failure | |
and is_too_large_sequence | |
and not is_multi_byte_decoder | |
): | |
try: | |
sequences[int(50e3) :].decode(encoding_iana, errors="strict") | |
except UnicodeDecodeError as e: | |
logger.log( | |
TRACE, | |
"LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s", | |
encoding_iana, | |
str(e), | |
) | |
tested_but_hard_failure.append(encoding_iana) | |
continue | |
mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0 | |
if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up: | |
tested_but_soft_failure.append(encoding_iana) | |
logger.log( | |
TRACE, | |
"%s was excluded because of initial chaos probing. Gave up %i time(s). " | |
"Computed mean chaos is %f %%.", | |
encoding_iana, | |
early_stop_count, | |
round(mean_mess_ratio * 100, ndigits=3), | |
) | |
# Preparing those fallbacks in case we got nothing. | |
if ( | |
enable_fallback | |
and encoding_iana in ["ascii", "utf_8", specified_encoding] | |
and not lazy_str_hard_failure | |
): | |
fallback_entry = CharsetMatch( | |
sequences, encoding_iana, threshold, False, [], decoded_payload | |
) | |
if encoding_iana == specified_encoding: | |
fallback_specified = fallback_entry | |
elif encoding_iana == "ascii": | |
fallback_ascii = fallback_entry | |
else: | |
fallback_u8 = fallback_entry | |
continue | |
logger.log( | |
TRACE, | |
"%s passed initial chaos probing. Mean measured chaos is %f %%", | |
encoding_iana, | |
round(mean_mess_ratio * 100, ndigits=3), | |
) | |
if not is_multi_byte_decoder: | |
target_languages: List[str] = encoding_languages(encoding_iana) | |
else: | |
target_languages = mb_encoding_languages(encoding_iana) | |
if target_languages: | |
logger.log( | |
TRACE, | |
"{} should target any language(s) of {}".format( | |
encoding_iana, str(target_languages) | |
), | |
) | |
cd_ratios = [] | |
# We shall skip the CD when its about ASCII | |
# Most of the time its not relevant to run "language-detection" on it. | |
if encoding_iana != "ascii": | |
for chunk in md_chunks: | |
chunk_languages = coherence_ratio( | |
chunk, | |
language_threshold, | |
",".join(target_languages) if target_languages else None, | |
) | |
cd_ratios.append(chunk_languages) | |
cd_ratios_merged = merge_coherence_ratios(cd_ratios) | |
if cd_ratios_merged: | |
logger.log( | |
TRACE, | |
"We detected language {} using {}".format( | |
cd_ratios_merged, encoding_iana | |
), | |
) | |
results.append( | |
CharsetMatch( | |
sequences, | |
encoding_iana, | |
mean_mess_ratio, | |
bom_or_sig_available, | |
cd_ratios_merged, | |
decoded_payload, | |
) | |
) | |
if ( | |
encoding_iana in [specified_encoding, "ascii", "utf_8"] | |
and mean_mess_ratio < 0.1 | |
): | |
logger.debug( | |
"Encoding detection: %s is most likely the one.", encoding_iana | |
) | |
if explain: | |
logger.removeHandler(explain_handler) | |
logger.setLevel(previous_logger_level) | |
return CharsetMatches([results[encoding_iana]]) | |
if encoding_iana == sig_encoding: | |
logger.debug( | |
"Encoding detection: %s is most likely the one as we detected a BOM or SIG within " | |
"the beginning of the sequence.", | |
encoding_iana, | |
) | |
if explain: | |
logger.removeHandler(explain_handler) | |
logger.setLevel(previous_logger_level) | |
return CharsetMatches([results[encoding_iana]]) | |
if len(results) == 0: | |
if fallback_u8 or fallback_ascii or fallback_specified: | |
logger.log( | |
TRACE, | |
"Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.", | |
) | |
if fallback_specified: | |
logger.debug( | |
"Encoding detection: %s will be used as a fallback match", | |
fallback_specified.encoding, | |
) | |
results.append(fallback_specified) | |
elif ( | |
(fallback_u8 and fallback_ascii is None) | |
or ( | |
fallback_u8 | |
and fallback_ascii | |
and fallback_u8.fingerprint != fallback_ascii.fingerprint | |
) | |
or (fallback_u8 is not None) | |
): | |
logger.debug("Encoding detection: utf_8 will be used as a fallback match") | |
results.append(fallback_u8) | |
elif fallback_ascii: | |
logger.debug("Encoding detection: ascii will be used as a fallback match") | |
results.append(fallback_ascii) | |
if results: | |
logger.debug( | |
"Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.", | |
results.best().encoding, # type: ignore | |
len(results) - 1, | |
) | |
else: | |
logger.debug("Encoding detection: Unable to determine any suitable charset.") | |
if explain: | |
logger.removeHandler(explain_handler) | |
logger.setLevel(previous_logger_level) | |
return results | |
def from_fp( | |
fp: BinaryIO, | |
steps: int = 5, | |
chunk_size: int = 512, | |
threshold: float = 0.20, | |
cp_isolation: Optional[List[str]] = None, | |
cp_exclusion: Optional[List[str]] = None, | |
preemptive_behaviour: bool = True, | |
explain: bool = False, | |
language_threshold: float = 0.1, | |
enable_fallback: bool = True, | |
) -> CharsetMatches: | |
""" | |
Same thing than the function from_bytes but using a file pointer that is already ready. | |
Will not close the file pointer. | |
""" | |
return from_bytes( | |
fp.read(), | |
steps, | |
chunk_size, | |
threshold, | |
cp_isolation, | |
cp_exclusion, | |
preemptive_behaviour, | |
explain, | |
language_threshold, | |
enable_fallback, | |
) | |
def from_path( | |
path: Union[str, bytes, PathLike], # type: ignore[type-arg] | |
steps: int = 5, | |
chunk_size: int = 512, | |
threshold: float = 0.20, | |
cp_isolation: Optional[List[str]] = None, | |
cp_exclusion: Optional[List[str]] = None, | |
preemptive_behaviour: bool = True, | |
explain: bool = False, | |
language_threshold: float = 0.1, | |
enable_fallback: bool = True, | |
) -> CharsetMatches: | |
""" | |
Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode. | |
Can raise IOError. | |
""" | |
with open(path, "rb") as fp: | |
return from_fp( | |
fp, | |
steps, | |
chunk_size, | |
threshold, | |
cp_isolation, | |
cp_exclusion, | |
preemptive_behaviour, | |
explain, | |
language_threshold, | |
enable_fallback, | |
) | |
def is_binary( | |
fp_or_path_or_payload: Union[PathLike, str, BinaryIO, bytes], # type: ignore[type-arg] | |
steps: int = 5, | |
chunk_size: int = 512, | |
threshold: float = 0.20, | |
cp_isolation: Optional[List[str]] = None, | |
cp_exclusion: Optional[List[str]] = None, | |
preemptive_behaviour: bool = True, | |
explain: bool = False, | |
language_threshold: float = 0.1, | |
enable_fallback: bool = False, | |
) -> bool: | |
""" | |
Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string. | |
Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match | |
are disabled to be stricter around ASCII-compatible but unlikely to be a string. | |
""" | |
if isinstance(fp_or_path_or_payload, (str, PathLike)): | |
guesses = from_path( | |
fp_or_path_or_payload, | |
steps=steps, | |
chunk_size=chunk_size, | |
threshold=threshold, | |
cp_isolation=cp_isolation, | |
cp_exclusion=cp_exclusion, | |
preemptive_behaviour=preemptive_behaviour, | |
explain=explain, | |
language_threshold=language_threshold, | |
enable_fallback=enable_fallback, | |
) | |
elif isinstance( | |
fp_or_path_or_payload, | |
( | |
bytes, | |
bytearray, | |
), | |
): | |
guesses = from_bytes( | |
fp_or_path_or_payload, | |
steps=steps, | |
chunk_size=chunk_size, | |
threshold=threshold, | |
cp_isolation=cp_isolation, | |
cp_exclusion=cp_exclusion, | |
preemptive_behaviour=preemptive_behaviour, | |
explain=explain, | |
language_threshold=language_threshold, | |
enable_fallback=enable_fallback, | |
) | |
else: | |
guesses = from_fp( | |
fp_or_path_or_payload, | |
steps=steps, | |
chunk_size=chunk_size, | |
threshold=threshold, | |
cp_isolation=cp_isolation, | |
cp_exclusion=cp_exclusion, | |
preemptive_behaviour=preemptive_behaviour, | |
explain=explain, | |
language_threshold=language_threshold, | |
enable_fallback=enable_fallback, | |
) | |
return not guesses | |