radiobee-aligner / radiobee /process_upload.py
freemt
Update before sent-align
4c04f50
raw
history blame
2.85 kB
"""Process uploads."""
# pylint: disable=invalid-name, unused-import
from typing import Union
from pathlib import Path
import tempfile
import cchardet
from logzero import logger
def process_upload(upload: Union[tempfile._TemporaryFileWrapper, bytes]) -> str:
"""Process upload (fileobj or bytes(zip file: io.BytesIO further to zipfile.ZipFile)).
gr.inputs.File("file"): upload normal file
gr.inputs.File("bytes"): upload zip file
"""
if isinstance(upload, bytes):
logger.warning("Not implemented, yet, for zip file")
return "Not implemented, yet, for zip file"
try:
fpath = Path(upload.name)
except Exception as e:
logger.error("Path(upload.name) error: %s", e)
return str(e)
suffixes = [
"",
".txt",
".text",
".md",
"tsv",
]
# check .txt .md ''(no suffix)
if fpath.suffix.lower() not in suffixes:
logger.warning('suffix: [%s] not in %s', fpath.suffix, suffixes)
# return "File type not supported, yet."
try:
data = Path(upload.name).read_bytes()
except Exception as e:
logger.error("Unable to read data from %s, errors: %s", fpath, e)
data = str(e).encode()
# no data, empty file, return ""
if not data:
logger.info("empty file: %s", upload.name)
return ""
encoding = cchardet.detect(data).get("encoding")
if encoding is not None:
try:
text = fpath.read_text(encoding=encoding)
except Exception as e:
logger.error("Unable to retrieve text, error: %s", e)
text = str(e)
# return f"{upload.name} {type(upload)}\n\n{text}"
# return f"{upload.name}\n{text}"
return text
# not able to cchardet: encoding is None, docx, pdf, epub, zip etc
logger.info("Trying docx...to be implemented")
# T ODO .docx .epub .mobi .pdf etc.
_ = Path(upload.name)
msg = f"binary file: {_.stem[:-8]}{_.suffix}"
logger.warning("%s", msg)
return msg
_ = ''' # colab gradio-file-inputs-upload.ipynb
# file_to_text/process_file
def zip_to_text(file_obj):
"""
# zf = zipfile.ZipFile('german-recipes-dataset.zip')
zf = file_obj
namelist = zipfile.ZipFile.namelist(zf);
# filename = zf.open(namelist[0]);
file_contents = []
for filename in namelist:
with zf.open(filename) as fhandle:
file_contents.append(fhandle.read().decode())
"""
# fileobj is <class 'tempfile._TemporaryFileWrapper'>
# gr.inputs.File("bytes")
if isinstance(file_obj, bytes):
data = file_obj.decode()
return f"{type(file_obj)}\n{dir(file_obj)}\n{data}"
# "file"/gr.inputs.File("file") file_obj.name: /tmp/READMEzm8hc5ze.md
data = Path(file_obj.name).read_bytes()
return f"{file_obj.name} {type(file_obj)}\n{dir(file_obj)} \n{data}"
# '''