LLMBB-Agent / utils.py
ff_li
目录调整
f67d239
raw
history blame
9.64 kB
import os
import re
import stat
import sys
from urllib.parse import unquote, urlparse
import jsonlines
from agent.log import logger
from agent.utils.doc_parser import parse_doc, parse_html_bs
from agent.utils.utils import print_traceback, save_text_to_file
from schema import Record
from b2sdk.v2 import B2Api
from b2sdk.v2 import InMemoryAccountInfo
import hashlib
import datetime
from io import BytesIO
def _fix_secure_write_for_code_interpreter(code_interpreter_ws):
if 'linux' in sys.platform.lower():
fname = os.path.join(code_interpreter_ws, 'test_file_permission.txt')
if os.path.exists(fname):
os.remove(fname)
with os.fdopen(
os.open(fname, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, 0o0600),
'w') as f:
f.write('test')
file_mode = stat.S_IMODE(os.stat(fname).st_mode) & 0o6677
if file_mode != 0o0600:
os.environ['JUPYTER_ALLOW_INSECURE_WRITES'] = '1'
work_space_root = "./workspace"
cache_root = f"{work_space_root}/browser_cache/"
download_root = f"{work_space_root}/download/"
code_interpreter_ws = f"{work_space_root}/ci_workspace/"
cache_file_popup_url = os.path.join(cache_root, 'popup_url.jsonl')
cache_file = os.path.join(cache_root, 'browse.jsonl')
max_ref_token = 4000
max_days = 7
os.makedirs(work_space_root, exist_ok=True)
os.makedirs(cache_root, exist_ok=True)
os.makedirs(download_root, exist_ok=True)
os.makedirs(code_interpreter_ws, exist_ok=True)
code_interpreter_work_dir = code_interpreter_ws
os.environ['M6_CODE_INTERPRETER_WORK_DIR'] = code_interpreter_work_dir
os.environ['M6_CODE_INTERPRETER_STATIC_URL'] = f'{os.getenv("DOMAIN")}/static'
os.environ["HF_HOME"] = ".cache/huggingface/"
os.environ["MPLCONFIGDIR"] = ".cache/huggingface/"
_fix_secure_write_for_code_interpreter(code_interpreter_ws)
class B2Manager():
def __init__(self):
info = InMemoryAccountInfo()
b2_api = B2Api(info)
application_key_id = os.environ.get("b2_key_id")
application_key = os.environ.get("b2_key")
b2_api.authorize_account("production", application_key_id, application_key)
self.b2_bucket = b2_api.get_bucket_by_name(os.environ.get("b2_bucket_name"))
self.b2_api = b2_api
self.file_name = None
def gen_file_name(self, access_token, url, need_md5):
url_md5 = hashlib.md5(b'%s' % url.encode(encoding='UTF-8')).hexdigest()
return f"{access_token}/{url_md5}" if need_md5 else f"{access_token}/{url}"
def get(self, access_token, url, need_md5=True):
in_memory_file = BytesIO()
self.b2_bucket.download_file_by_name(self.gen_file_name(access_token, url, need_md5)).save(in_memory_file)
# export_file = self.b2_bucket.download_file_by_name(self.file_name)
# export_file.save(in_memory_file)
in_memory_file.seek(0)
return str(in_memory_file.read(), "utf-8")
def upsert(self, access_token, url, content, need_md5=True):
self.b2_bucket.upload_bytes(content.encode('utf-8'), self.gen_file_name(access_token, url, need_md5), file_infos=None)
# self.b2_bucket.upload()
def delete(self, access_token, url, need_md5=True):
file_version_info = self.b2_bucket.get_file_info_by_name(self.gen_file_name(access_token, url, need_md5))
self.b2_bucket.hide_file(file_version_info.file_name)
# for version in self.b2_bucket.list_file_versions(self.file_name):
# self.b2_bucket.delete_file_version(version.id_, version.file_name)
def list_files(self, access_token):
files = []
for file_version_info, folder_name in self.b2_bucket.ls(folder_to_list=f"{access_token}/"):
# The upload timestamp is in milliseconds, so we divide by 1000 to convert it to seconds
upload_timestamp = datetime.datetime.fromtimestamp(file_version_info.upload_timestamp / 1000.0)
files.append(f"File Name: {file_version_info.file_name}, \nUpload timestamp: {upload_timestamp}, \nMetadata: {file_version_info.file_info}")
return files
def exists(self, access_token, url=None, need_md5=True):
try:
self.b2_bucket.get_file_info_by_name(self.gen_file_name(access_token, url, need_md5))
return True
except:
return False
def update_pop_url(data, cache_file_popup_url, access_token):
new_line = {'url': data['url'], "access_token": access_token}
lines = []
for line in jsonlines.open(cache_file_popup_url):
if line['access_token'] == access_token and line['url'] != data['url']:
lines.append(line)
lines.append(new_line)
with jsonlines.open(cache_file_popup_url, mode='w') as writer:
for new_line in lines:
writer.write(new_line)
response = 'Update URL'
return response
def change_checkbox_state(text, cache_file, access_token):
if not os.path.exists(cache_file):
return {'result': 'no file'}
lines = []
for line in jsonlines.open(cache_file):
if line['access_token'] == access_token and line['url'] == text[3:]:
if line['checked']:
line['checked'] = False
else:
line['checked'] = True
lines.append(line)
with jsonlines.open(cache_file, mode='w') as writer:
for new_line in lines:
writer.write(new_line)
return {'result': 'changed'}
def is_local_path(path):
if path.startswith('https://') or path.startswith('http://'):
return False
return True
def sanitize_chrome_file_path(file_path: str) -> str:
# For Linux and macOS.
if os.path.exists(file_path):
return file_path
# For native Windows, drop the leading '/' in '/C:/'
win_path = file_path
if win_path.startswith('/'):
win_path = win_path[1:]
if os.path.exists(win_path):
return win_path
# For Windows + WSL.
if re.match(r'^[A-Za-z]:/', win_path):
wsl_path = f'/mnt/{win_path[0].lower()}/{win_path[3:]}'
if os.path.exists(wsl_path):
return wsl_path
# For native Windows, replace / with \.
win_path = win_path.replace('/', '\\')
if os.path.exists(win_path):
return win_path
return file_path
def extract_and_cache_document(data, cache_root, access_token):
logger.info('Starting cache pages...')
if data['url'].split('.')[-1].lower() in ['pdf', 'docx', 'pptx']:
date1 = datetime.datetime.now()
# generate one processing record
new_record = Record(url=data['url'],
time='',
type=data['type'],
raw=[],
extract='',
access_token=access_token,
topic='',
checked=False,
session=[])
service.upsert(access_token, data['url'], new_record.model_dump_json())
if data['url'].startswith('https://') or data['url'].startswith( 'http://'):
pdf_path = data['url']
else:
parsed_url = urlparse(data['url'])
pdf_path = unquote(parsed_url.path)
pdf_path = sanitize_chrome_file_path(pdf_path)
try:
pdf_content = parse_doc(pdf_path)
except Exception:
print_traceback()
# del the processing record
service.delete(access_token, data['url'])
return 'failed'
date2 = datetime.datetime.now()
logger.info('Parsing pdf time: ' + str(date2 - date1))
data['content'] = pdf_content
data['type'] = 'pdf'
extract = pdf_path.split('/')[-1].split('\\')[-1].split('.')[0]
elif data['content'] and data['type'] == 'html':
new_record = Record(url=data['url'],
time='',
type=data['type'],
raw=[],
extract='',
access_token=access_token,
topic='',
checked=False,
session=[])
service.upsert(access_token, data['url'], new_record.model_dump_json())
try:
tmp_html_file = os.path.join(cache_root, 'tmp.html')
save_text_to_file(tmp_html_file, data['content'])
data['content'] = parse_html_bs(tmp_html_file)
except Exception:
print_traceback()
extract = data['content'][0]['metadata']['title']
else:
logger.error(
'Only Support the Following File Types: [\'.html\', \'.pdf\', \'.docx\', \'.pptx\']'
)
raise NotImplementedError
today = datetime.date.today()
new_record = Record(url=data['url'],
time=str(today),
type=data['type'],
raw=data['content'],
extract=extract,
access_token=access_token,
topic='',
checked=True,
session=[])
service.upsert(access_token, data['url'], new_record.model_dump_json())
response = 'Cached'
return response
service = B2Manager()
if __name__ == '__main__':
# print(service.gen_file_name("test", "settings.xml"))
# print(service.get("test", "settings.xml"))
# print(service.upsert("test", "settings.xml", b"1111"))
print(service.list_files("test"))
print(service.exists("test", "https://tree-iad1-0003.secure.backblaze.com/b2_browse_files2.htm1"))