Spaces:
Running
Running
import os | |
import re | |
import stat | |
import sys | |
from urllib.parse import unquote, urlparse | |
import jsonlines | |
from agent.log import logger | |
from agent.utils.doc_parser import parse_doc, parse_html_bs | |
from agent.utils.utils import print_traceback, save_text_to_file | |
from schema import Record | |
from b2sdk.v2 import B2Api | |
from b2sdk.v2 import InMemoryAccountInfo | |
import hashlib | |
import datetime | |
from io import BytesIO | |
def _fix_secure_write_for_code_interpreter(code_interpreter_ws): | |
if 'linux' in sys.platform.lower(): | |
fname = os.path.join(code_interpreter_ws, 'test_file_permission.txt') | |
if os.path.exists(fname): | |
os.remove(fname) | |
with os.fdopen( | |
os.open(fname, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, 0o0600), | |
'w') as f: | |
f.write('test') | |
file_mode = stat.S_IMODE(os.stat(fname).st_mode) & 0o6677 | |
if file_mode != 0o0600: | |
os.environ['JUPYTER_ALLOW_INSECURE_WRITES'] = '1' | |
work_space_root = "./workspace" | |
cache_root = f"{work_space_root}/browser_cache/" | |
download_root = f"{work_space_root}/download/" | |
code_interpreter_ws = f"{work_space_root}/ci_workspace/" | |
cache_file_popup_url = os.path.join(cache_root, 'popup_url.jsonl') | |
cache_file = os.path.join(cache_root, 'browse.jsonl') | |
max_ref_token = 4000 | |
max_days = 7 | |
os.makedirs(work_space_root, exist_ok=True) | |
os.makedirs(cache_root, exist_ok=True) | |
os.makedirs(download_root, exist_ok=True) | |
os.makedirs(code_interpreter_ws, exist_ok=True) | |
code_interpreter_work_dir = code_interpreter_ws | |
os.environ['M6_CODE_INTERPRETER_WORK_DIR'] = code_interpreter_work_dir | |
os.environ['M6_CODE_INTERPRETER_STATIC_URL'] = f'{os.getenv("DOMAIN")}/static' | |
os.environ["HF_HOME"] = ".cache/huggingface/" | |
os.environ["MPLCONFIGDIR"] = ".cache/huggingface/" | |
_fix_secure_write_for_code_interpreter(code_interpreter_ws) | |
class B2Manager(): | |
def __init__(self): | |
info = InMemoryAccountInfo() | |
b2_api = B2Api(info) | |
application_key_id = os.environ.get("b2_key_id") | |
application_key = os.environ.get("b2_key") | |
b2_api.authorize_account("production", application_key_id, application_key) | |
self.b2_bucket = b2_api.get_bucket_by_name(os.environ.get("b2_bucket_name")) | |
self.b2_api = b2_api | |
self.file_name = None | |
def gen_file_name(self, access_token, url, need_md5): | |
url_md5 = hashlib.md5(b'%s' % url.encode(encoding='UTF-8')).hexdigest() | |
return f"{access_token}/{url_md5}" if need_md5 else f"{access_token}/{url}" | |
def get(self, access_token, url, need_md5=True): | |
in_memory_file = BytesIO() | |
self.b2_bucket.download_file_by_name(self.gen_file_name(access_token, url, need_md5)).save(in_memory_file) | |
# export_file = self.b2_bucket.download_file_by_name(self.file_name) | |
# export_file.save(in_memory_file) | |
in_memory_file.seek(0) | |
return str(in_memory_file.read(), "utf-8") | |
def upsert(self, access_token, url, content, need_md5=True): | |
self.b2_bucket.upload_bytes(content.encode('utf-8'), self.gen_file_name(access_token, url, need_md5), file_infos=None) | |
# self.b2_bucket.upload() | |
def delete(self, access_token, url, need_md5=True): | |
file_version_info = self.b2_bucket.get_file_info_by_name(self.gen_file_name(access_token, url, need_md5)) | |
self.b2_bucket.hide_file(file_version_info.file_name) | |
# for version in self.b2_bucket.list_file_versions(self.file_name): | |
# self.b2_bucket.delete_file_version(version.id_, version.file_name) | |
def list_files(self, access_token): | |
files = [] | |
for file_version_info, folder_name in self.b2_bucket.ls(folder_to_list=f"{access_token}/"): | |
# The upload timestamp is in milliseconds, so we divide by 1000 to convert it to seconds | |
upload_timestamp = datetime.datetime.fromtimestamp(file_version_info.upload_timestamp / 1000.0) | |
files.append(f"File Name: {file_version_info.file_name}, \nUpload timestamp: {upload_timestamp}, \nMetadata: {file_version_info.file_info}") | |
return files | |
def exists(self, access_token, url=None, need_md5=True): | |
try: | |
self.b2_bucket.get_file_info_by_name(self.gen_file_name(access_token, url, need_md5)) | |
return True | |
except: | |
return False | |
def update_pop_url(data, cache_file_popup_url, access_token): | |
new_line = {'url': data['url'], "access_token": access_token} | |
lines = [] | |
for line in jsonlines.open(cache_file_popup_url): | |
if line['access_token'] == access_token and line['url'] != data['url']: | |
lines.append(line) | |
lines.append(new_line) | |
with jsonlines.open(cache_file_popup_url, mode='w') as writer: | |
for new_line in lines: | |
writer.write(new_line) | |
response = 'Update URL' | |
return response | |
def change_checkbox_state(text, cache_file, access_token): | |
if not os.path.exists(cache_file): | |
return {'result': 'no file'} | |
lines = [] | |
for line in jsonlines.open(cache_file): | |
if line['access_token'] == access_token and line['url'] == text[3:]: | |
if line['checked']: | |
line['checked'] = False | |
else: | |
line['checked'] = True | |
lines.append(line) | |
with jsonlines.open(cache_file, mode='w') as writer: | |
for new_line in lines: | |
writer.write(new_line) | |
return {'result': 'changed'} | |
def is_local_path(path): | |
if path.startswith('https://') or path.startswith('http://'): | |
return False | |
return True | |
def sanitize_chrome_file_path(file_path: str) -> str: | |
# For Linux and macOS. | |
if os.path.exists(file_path): | |
return file_path | |
# For native Windows, drop the leading '/' in '/C:/' | |
win_path = file_path | |
if win_path.startswith('/'): | |
win_path = win_path[1:] | |
if os.path.exists(win_path): | |
return win_path | |
# For Windows + WSL. | |
if re.match(r'^[A-Za-z]:/', win_path): | |
wsl_path = f'/mnt/{win_path[0].lower()}/{win_path[3:]}' | |
if os.path.exists(wsl_path): | |
return wsl_path | |
# For native Windows, replace / with \. | |
win_path = win_path.replace('/', '\\') | |
if os.path.exists(win_path): | |
return win_path | |
return file_path | |
def extract_and_cache_document(data, cache_root, access_token): | |
logger.info('Starting cache pages...') | |
if data['url'].split('.')[-1].lower() in ['pdf', 'docx', 'pptx']: | |
date1 = datetime.datetime.now() | |
# generate one processing record | |
new_record = Record(url=data['url'], | |
time='', | |
type=data['type'], | |
raw=[], | |
extract='', | |
access_token=access_token, | |
topic='', | |
checked=False, | |
session=[]) | |
service.upsert(access_token, data['url'], new_record.model_dump_json()) | |
if data['url'].startswith('https://') or data['url'].startswith( 'http://'): | |
pdf_path = data['url'] | |
else: | |
parsed_url = urlparse(data['url']) | |
pdf_path = unquote(parsed_url.path) | |
pdf_path = sanitize_chrome_file_path(pdf_path) | |
try: | |
pdf_content = parse_doc(pdf_path) | |
except Exception: | |
print_traceback() | |
# del the processing record | |
service.delete(access_token, data['url']) | |
return 'failed' | |
date2 = datetime.datetime.now() | |
logger.info('Parsing pdf time: ' + str(date2 - date1)) | |
data['content'] = pdf_content | |
data['type'] = 'pdf' | |
extract = pdf_path.split('/')[-1].split('\\')[-1].split('.')[0] | |
elif data['content'] and data['type'] == 'html': | |
new_record = Record(url=data['url'], | |
time='', | |
type=data['type'], | |
raw=[], | |
extract='', | |
access_token=access_token, | |
topic='', | |
checked=False, | |
session=[]) | |
service.upsert(access_token, data['url'], new_record.model_dump_json()) | |
try: | |
tmp_html_file = os.path.join(cache_root, 'tmp.html') | |
save_text_to_file(tmp_html_file, data['content']) | |
data['content'] = parse_html_bs(tmp_html_file) | |
except Exception: | |
print_traceback() | |
extract = data['content'][0]['metadata']['title'] | |
else: | |
logger.error( | |
'Only Support the Following File Types: [\'.html\', \'.pdf\', \'.docx\', \'.pptx\']' | |
) | |
raise NotImplementedError | |
today = datetime.date.today() | |
new_record = Record(url=data['url'], | |
time=str(today), | |
type=data['type'], | |
raw=data['content'], | |
extract=extract, | |
access_token=access_token, | |
topic='', | |
checked=True, | |
session=[]) | |
service.upsert(access_token, data['url'], new_record.model_dump_json()) | |
response = 'Cached' | |
return response | |
service = B2Manager() | |
if __name__ == '__main__': | |
# print(service.gen_file_name("test", "settings.xml")) | |
# print(service.get("test", "settings.xml")) | |
# print(service.upsert("test", "settings.xml", b"1111")) | |
print(service.list_files("test")) | |
print(service.exists("test", "https://tree-iad1-0003.secure.backblaze.com/b2_browse_files2.htm1")) | |