|
from dataclasses import dataclass |
|
from typing import Tuple |
|
|
|
from dataclasses import dataclass |
|
|
|
|
|
@dataclass |
|
class PartialUTF8: |
|
""" |
|
A data class representing the state of a partially decoded UTF-8 sequence. |
|
|
|
Attributes: |
|
- value (int): The current accumulated value of the partially decoded Unicode code point. |
|
This attribute stores the bits that have been decoded so far. For a fully decoded |
|
character or before any partial decoding has started, this would typically be `0`. |
|
|
|
- n_remain (int): The number of bytes remaining to complete the current UTF-8 encoded character. |
|
A value of `-1` indicates that there is no ongoing partial decoding, i.e., |
|
either decoding has not started, or the last character was fully decoded. |
|
|
|
This class is used to handle situations where UTF-8 encoded data may end in the middle of a character |
|
sequence, allowing for the decoding process to be resumed when more data becomes available. |
|
""" |
|
|
|
value: int = 0 |
|
n_remain: int = ( |
|
-1 |
|
) |
|
|
|
def __hash__(self): |
|
return hash((self.value, self.n_remain)) |
|
|
|
def __eq__(self, other): |
|
if not isinstance(other, PartialUTF8): |
|
return NotImplemented |
|
return self.value == other.value and self.n_remain == other.n_remain |
|
|
|
|
|
from typing import List, Tuple |
|
from functools import lru_cache |
|
|
|
|
|
@lru_cache(maxsize=3000000) |
|
def decode_utf8( |
|
src: bytes, partial_start: PartialUTF8 |
|
) -> Tuple[List[int], PartialUTF8]: |
|
|
|
lookup = [1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 2, 2, 3, 4] |
|
pos = 0 |
|
code_points = [] |
|
value = partial_start.value |
|
n_remain = partial_start.n_remain |
|
|
|
|
|
while pos < len(src) and n_remain > 0: |
|
next_byte = src[pos] |
|
|
|
if (next_byte >> 6) != 2: |
|
|
|
code_points = [0] |
|
return code_points, PartialUTF8(0, -1) |
|
|
|
|
|
value = (value << 6) + (next_byte & 0x3F) |
|
pos += 1 |
|
n_remain -= 1 |
|
|
|
|
|
if partial_start.n_remain > 0 and n_remain == 0: |
|
code_points.append(value) |
|
|
|
|
|
while pos < len(src): |
|
first_byte = src[pos] |
|
highbits = first_byte >> 4 |
|
n_remain = lookup[highbits] - 1 |
|
|
|
|
|
if n_remain < 0: |
|
|
|
code_points = [0] |
|
return code_points, PartialUTF8(0, -1) |
|
|
|
|
|
mask = (1 << (7 - n_remain)) - 1 |
|
value = first_byte & mask |
|
pos += 1 |
|
|
|
|
|
while pos < len(src) and n_remain > 0: |
|
next_byte = src[pos] |
|
|
|
value = (value << 6) + (next_byte & 0x3F) |
|
pos += 1 |
|
n_remain -= 1 |
|
|
|
|
|
if n_remain == 0: |
|
code_points.append(value) |
|
|
|
|
|
|
|
|
|
if n_remain == 0: |
|
n_remain = -1 |
|
value = 0 |
|
|
|
|
|
return code_points, PartialUTF8(value, n_remain) |
|
|
|
|
|
def decode_utf8_leading_char(src: bytes) -> tuple: |
|
first_byte = src[0] |
|
highbits = first_byte >> 4 |
|
lookup = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 3, 4] |
|
char_len = lookup[highbits] |
|
|
|
|
|
utf8_char_bytes = src[:char_len] |
|
|
|
|
|
char = utf8_char_bytes.decode("utf-8") |
|
|
|
|
|
code_point = ord(char) |
|
|
|
|
|
remaining_bytes = src[char_len:] |
|
|
|
return code_point, remaining_bytes |
|
|
|
|
|
def decode_utf8_string(utf8_bytes: bytes) -> list: |
|
code_points = [] |
|
while utf8_bytes: |
|
code_point, utf8_bytes = decode_utf8_leading_char(utf8_bytes) |
|
code_points.append(code_point) |
|
return code_points |
|
|
|
if __name__ == "__main__": |
|
|
|
my_string = "€Hello" |
|
|
|
|
|
utf8_bytes = my_string.encode("utf-8") |
|
|
|
assert utf8_bytes == b"\xe2\x82\xacHello" |
|
|
|
|
|
code_point, remaining_bytes = decode_utf8_leading_char(utf8_bytes) |
|
|
|
print(f"Code Point: {code_point}") |
|
print(f"Remaining Bytes: {remaining_bytes}") |
|
|
|
|
|
code_points = decode_utf8_string(utf8_bytes) |
|
|
|
print( |
|
f"Code Points: {code_points}" |
|
) |
|
|
|
print("-" * 50) |
|
|
|
|
|
utf8_bytes = b"\xe2\x82\xacHello" |
|
partial_start = PartialUTF8() |
|
code_points, partial_utf8 = decode_utf8(utf8_bytes, partial_start) |
|
|
|
print("Code Points:", code_points) |
|
print("Remaining UTF-8 State:", partial_utf8.value, partial_utf8.n_remain) |