from flask import Flask,request,render_template,send_file,jsonify import os from transformers import AutoTokenizer, AutoModel import anvil.server import pathlib import textwrap import google.generativeai as genai import import_ipynb from library import call_gpt, call_gemini from background_service import BackgroundTaskService anvil.server.connect('PLMOIU5VCGGUOJH2XORIBWV3-ZXZVFLWX7QFIIAF4') app=Flask(__name__) MESSAGED={'title':'API Server', 'messageL':['published server functions:','encode(text)', 'call_gemini(text)']} tokenizer = AutoTokenizer.from_pretrained('allenai/specter') encoder = AutoModel.from_pretrained('allenai/specter') # GOOGLE_API_KEY=os.getenv('GOOGLE_API_KEY') # genai.configure(api_key=GOOGLE_API_KEY) service=BackgroundTaskService(max_tasks=10) service.register(call_gpt) service.register(call_gemini) @anvil.server.callable def launch(func_name,*args): global service # Launch task task_id = service.launch_task(func_name, *args) print(f"Task launched with ID: {task_id}") return task_id @anvil.server.callable def poll(task_id): global service # Poll for completion; if not complete return "In Progress" else return result result = service.get_result(task_id) if result=='No such task': return str(result) elif result!='In Progress': del service.results[task_id] if isinstance(result, (int, float, str, list, dict, tuple)): return result else: print(str(result)) return str(result) else: return str(result) # @anvil.server.callable # def call_gemini(text): # model = genai.GenerativeModel('gemini-pro') # response = model.generate_content(text) # return response.text @anvil.server.callable def encode_anvil(text): inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt", max_length=512) result = encoder(**inputs) embeddings = result.last_hidden_state[:, 0, :] emb_array = embeddings.detach().numpy() embedding=emb_array.tolist() return embedding @app.route('/encode',methods=['GET','POST']) def encode(): print(request) if request.method=='GET': text=request.args.get('text') elif request.method=='POST': data=request.get_json() if 'text' in data: text=data["text"] if text=='' or text is None: return -1 inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt", max_length=512) result = encoder(**inputs) embeddings = result.last_hidden_state[:, 0, :] emb_array = embeddings.detach().numpy() embedding=emb_array.tolist() return jsonify({'embedding': embedding}) @app.route("/file/") def return_file(filename): return send_file('./data/'+filename) @app.route('/run',methods=['GET','POST']) def run_script(): script='' # print(request.method) print(request) if request.method=='GET': script=request.args.get('script') print('I am in get') elif request.method=='POST': print('I am in post') data=request.get_json() if 'script' in data: script=data['script'] if script=='' or script is None: return 'INVALID' os.system(script+' > ./out.txt') with open('./out.txt','r') as f: output=f.read() return output @app.route('/',methods=['GET', 'POST']) def home(): return render_template('home.html',messageD=MESSAGED) if __name__=='__main__': app.run(host="0.0.0.0", port=7860)