import json import multiprocessing import os from pathlib import Path import add_qwen_libs # NOQA import jsonlines import uvicorn from fastapi import FastAPI, Request, HTTPException, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from fastapi.staticfiles import StaticFiles from qwen_agent.log import logger from qwen_agent.utils.utils import get_local_ip from qwen_server.schema import GlobalConfig from qwen_server.utils import extract_and_cache_document from starlette.middleware.sessions import SessionMiddleware # Read config with open(Path(__file__).resolve().parent / 'server_config.json', 'r') as f: server_config = json.load(f) server_config = GlobalConfig(**server_config) app = FastAPI() logger.info(get_local_ip()) origins = [ 'http://127.0.0.1:' + str(server_config.server.workstation_port), 'http://localhost:' + str(server_config.server.workstation_port), 'http://0.0.0.0:' + str(server_config.server.workstation_port), 'http://' + get_local_ip() + ':' + str(server_config.server.workstation_port), ] access_token_file = os.path.join(server_config.path.cache_root, 'access_token.jsonl') app.add_middleware( CORSMiddleware, # allow_origins=origins, allow_credentials=True, allow_methods=['*'], allow_headers=['*'], ) app.mount('/static', StaticFiles(directory=server_config.path.code_interpreter_ws), name='static') def update_pop_url(data, cache_file_popup_url, access_token): new_line = {'url': data['url'], "access_token": access_token} lines = [] for line in jsonlines.open(cache_file_popup_url): if line['access_token'] == access_token and line['url'] != data['url']: lines.append(line) lines.append(new_line) with jsonlines.open(cache_file_popup_url, mode='w') as writer: for new_line in lines: writer.write(new_line) response = 'Update URL' return response def change_checkbox_state(text, cache_file, access_token): if not os.path.exists(cache_file): return {'result': 'no file'} lines = [] for line in jsonlines.open(cache_file): if line['access_token'] == access_token and line['url'] == text[3:]: if line['checked']: line['checked'] = False else: line['checked'] = True lines.append(line) with jsonlines.open(cache_file, mode='w') as writer: for new_line in lines: writer.write(new_line) return {'result': 'changed'} @app.middleware("http") async def access_token_auth(request: Request, call_next): # print(f"Request URL path: {request.url}") access_token: str = request.headers.get("Authorization") or request.query_params.get("access_token") or request.session.get("access_token") is_valid = False if access_token: if os.getenv("ACCESS_TOKEN") == access_token: is_valid = True else: for line in jsonlines.open(access_token_file): if line['access_token'] == access_token: is_valid = True break if not is_valid: return Response(status_code=401, content="the token is not valid") request.session.setdefault("access_token", access_token) return await call_next(request) @app.get('/healthz') async def healthz(request: Request): return JSONResponse({"healthz": True}) @app.post('/token/add') async def add_token(request: Request): access_token: str = request.headers.get("Authorization") or request.query_params.get("access_token") or request.session.get("access_token") if not access_token or os.getenv("ACCESS_TOKEN") != access_token: return Response(status_code=401, content="the token is not valid") data = await request.json() lines = [] for line in jsonlines.open(access_token_file): lines.append(line) lines.append(data) with jsonlines.open(access_token_file, mode='w') as writer: for new_line in lines: writer.write(new_line) return JSONResponse({"success": True}) @app.get('/cachedata/{file_name}') async def cache_data(request: Request, file_name: str): access_token: str = request.headers.get("Authorization") or request.query_params.get("access_token") or request.session.get("access_token") if not access_token or os.getenv("ACCESS_TOKEN") != access_token: return Response(status_code=401, content="the token is not valid") cache_file = os.path.join(server_config.path.cache_root, file_name) lines = [] for line in jsonlines.open(cache_file): lines.append(line) return JSONResponse(lines) @app.post('/endpoint') async def web_listening(request: Request): data = await request.json() msg_type = data['task'] access_token = request.session.get("access_token") cache_file_popup_url = os.path.join(server_config.path.cache_root, 'popup_url.jsonl') cache_file = os.path.join(server_config.path.cache_root, 'browse.jsonl') if msg_type == 'change_checkbox': rsp = change_checkbox_state(data['ckid'], cache_file, access_token) elif msg_type == 'cache': cache_obj = multiprocessing.Process( target=extract_and_cache_document, args=(data, cache_file, server_config.path.cache_root, access_token)) cache_obj.start() # rsp = cache_data(data, cache_file) rsp = 'caching' elif msg_type == 'pop_url': # What a misleading name! pop_url actually means add_url. pop is referring to the pop_up ui. rsp = update_pop_url(data, cache_file_popup_url, access_token) else: raise NotImplementedError return JSONResponse(content=rsp) import gradio as gr # from qwen_server.workstation_server import demo as workstation_app from qwen_server.assistant_server import demo as assistant_app # app = gr.mount_gradio_app(app, workstation_app, path="/workstation") app = gr.mount_gradio_app(app, assistant_app, path="/") app.add_middleware(SessionMiddleware, secret_key=os.getenv("SECRET_KEY"), max_age=25200) if __name__ == '__main__': uvicorn.run(app='database_server:app', host=server_config.server.server_host, port=server_config.server.fast_api_port, reload=False, workers=1)