import torch from PIL import Image import gradio as gr import spaces from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer import os from threading import Thread from pydantic import BaseModel from typing import Optional import io from io import BytesIO import pymupdf import docx from pptx import Presentation from fastapi import FastAPI, File, Form, UploadFile, HTTPException from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.responses import StreamingResponse from fastapi.responses import PlainTextResponse import uvicorn app = FastAPI() @app.post("/test/") async def test_endpoint(message: dict): if "text" not in message: raise HTTPException(status_code=400, detail="Missing 'text' in request body") response = {"message": f"Received your message: {message['text']}"} return response MODEL_LIST = ["nikravan/glm-4vq"] HF_TOKEN = os.environ.get("HF_TOKEN", None) MODEL_ID = MODEL_LIST[0] MODEL_NAME = "GLM-4vq" TITLE = "

AI CHAT DOCS

" DESCRIPTION = f"""


USANDO MODELO: {MODEL_NAME}

""" CSS = """ h1 { text-align: center; display: block; } """ tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True) def extract_text(path): return open(path, 'r').read() def extract_pdf(path): doc = pymupdf.open(path) text = "" for page in doc: text += page.get_text() return text def extract_docx(path): doc = docx.Document(path) data = [] for paragraph in doc.paragraphs: data.append(paragraph.text) content = '\n\n'.join(data) return content def extract_pptx(path): prs = Presentation(path) text = "" for slide in prs.slides: for shape in slide.shapes: if hasattr(shape, "text"): text += shape.text + "\n" return text def mode_load(path): choice = "" file_type = path.split(".")[-1] print(file_type) if file_type in ["pdf", "txt", "py", "docx", "pptx", "json", "cpp", "md"]: if file_type.endswith("pdf"): content = extract_pdf(path) elif file_type.endswith("docx"): content = extract_docx(path) elif file_type.endswith("pptx"): content = extract_pptx(path) else: content = extract_text(path) choice = "doc" print(content[:100]) return choice, content[:5000] elif file_type in ["png", "jpg", "jpeg", "bmp", "tiff", "webp"]: content = Image.open(path).convert('RGB') choice = "image" return choice, content else: raise gr.Error("Oops, unsupported files.") @spaces.GPU() def stream_chat(message, history: list, temperature: float, max_length: int, top_p: float, top_k: int, penalty: float): model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype=torch.bfloat16, low_cpu_mem_usage=True, trust_remote_code=True ) print(f'message is - {message}') print(f'history is - {history}') conversation = [] prompt_files = [] if message["files"]: choice, contents = mode_load(message["files"][-1]) if choice == "image": conversation.append({"role": "user", "image": contents, "content": message['text']}) elif choice == "doc": format_msg = contents + "\n\n\n" + "{} files uploaded.\n" + message['text'] conversation.append({"role": "user", "content": format_msg}) else: if len(history) == 0: # raise gr.Error("Please upload an image first.") contents = None conversation.append({"role": "user", "content": message['text']}) else: # image = Image.open(history[0][0][0]) for prompt, answer in history: if answer is None: prompt_files.append(prompt[0]) conversation.extend([{"role": "user", "content": ""}, {"role": "assistant", "content": ""}]) else: conversation.extend([{"role": "user", "content": prompt}, {"role": "assistant", "content": answer}]) if len(prompt_files) > 0: choice, contents = mode_load(prompt_files[-1]) else: choice = "" conversation.append({"role": "user", "image": "", "content": message['text']}) if choice == "image": conversation.append({"role": "user", "image": contents, "content": message['text']}) elif choice == "doc": format_msg = contents + "\n\n\n" + "{} files uploaded.\n" + message['text'] conversation.append({"role": "user", "content": format_msg}) print(f"Conversation is -\n{conversation}") input_ids = tokenizer.apply_chat_template(conversation, tokenize=True, add_generation_prompt=True, return_tensors="pt", return_dict=True).to(model.device) streamer = TextIteratorStreamer(tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( max_length=max_length, streamer=streamer, do_sample=True, top_p=top_p, top_k=top_k, temperature=temperature, repetition_penalty=penalty, eos_token_id=[151329, 151336, 151338], ) gen_kwargs = {**input_ids, **generate_kwargs} with torch.no_grad(): thread = Thread(target=model.generate, kwargs=gen_kwargs) thread.start() buffer = "" for new_text in streamer: buffer += new_text yield buffer chatbot = gr.Chatbot( #rtl=True, ) chat_input = gr.MultimodalTextbox( interactive=True, placeholder="Enter message or upload a file ...", show_label=False, #rtl=True, ) EXAMPLES = [ [{"text": "Resumir Documento"}], [{"text": "Explicar la Imagen"}], [{"text": "¿De qué es la foto?", "files": ["perro.jpg"]}], [{"text": "Quiero armar un JSON, solo el JSON sin texto, que contenga los datos de la primera mitad de la tabla de la imagen (las primeras 10 jurisdicciones 901-910). Ten en cuenta que los valores numéricos son decimales de cuatro dígitos. La tabla contiene las siguientes columnas: Codigo, Nombre, Fecha Inicio, Fecha Cese, Coeficiente Ingresos, Coeficiente Gastos y Coeficiente Unificado. La tabla puede contener valores vacíos, en ese caso dejarlos como null. Cada fila de la tabla representa una jurisdicción con sus respectivos valores.", }] ] @spaces.GPU() def simple_chat(message, temperature: float = 0.8, max_length: int = 4096, top_p: float = 1, top_k: int = 10, penalty: float = 1.0): model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype=torch.bfloat16, low_cpu_mem_usage=True, trust_remote_code=True ) print(f'Message is - {message}') conversation = [] if message["file_content"]: choice, contents = mode_load(message["file_content"]) if choice == "image": conversation.append({"role": "user", "image": contents, "content": message['text']}) elif choice == "doc": format_msg = contents + "\n\n\n" + "{} files uploaded.\n".format(message['file_name']) + message['text'] conversation.append({"role": "user", "content": format_msg}) else: conversation.append({"role": "user", "content": message['text']}) print(f"Conversation is -\n{conversation}") input_ids = tokenizer.apply_chat_template(conversation, tokenize=True, add_generation_prompt=True, return_tensors="pt", return_dict=True).to(model.device) streamer = TextIteratorStreamer(tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( max_length=max_length, streamer=streamer, do_sample=True, top_p=top_p, top_k=top_k, temperature=temperature, repetition_penalty=penalty, eos_token_id=[151329, 151336, 151338], ) gen_kwargs = {**input_ids, **generate_kwargs} with torch.no_grad(): thread = Thread(target=model.generate, kwargs=gen_kwargs) thread.start() buffer = "" for new_text in streamer: buffer += new_text yield buffer print("---------") print("Text: ") print(buffer) print("---------") # @spaces.GPU() # def simple_chat(message: dict, temperature: float = 0.8, max_length: int = 4096, top_p: float = 1, top_k: int = 10, penalty: float = 1.0): # try: # model = AutoModelForCausalLM.from_pretrained( # MODEL_ID, # torch_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32, # low_cpu_mem_usage=True, # trust_remote_code=True # ) # #tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True) # conversation = [] # if "file_content" in message and message["file_content"]: # file_content = message["file_content"] # file_name = message["file_name"] # with open(file_name, "wb") as f: # f.write(file_content.read()) # choice, contents = mode_load(file_name) # if choice == "image": # conversation.append({"role": "user", "image": contents, "content": message['text']}) # elif choice == "doc": # message['text'] = contents + "\n\n\n" + "{} files uploaded.\n".format(1) + message['text'] # conversation.append({"role": "user", "content": message['text']}) # # format_msg = contents + "\n\n\n" + "{} files uploaded.\n".format(1) + message['text'] # # conversation.append({"role": "user", "content": format_msg}) # else: # conversation.append({"role": "user", "content": message['text']}) # input_ids = tokenizer.apply_chat_template(conversation, tokenize=True, add_generation_prompt=True, return_tensors="pt", return_dict=True).to(model.device) # streamer = TextIteratorStreamer(tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True) # generate_kwargs = dict( # max_length=max_length, # do_sample=True, # top_p=top_p, # top_k=top_k, # temperature=temperature, # repetition_penalty=penalty, # eos_token_id=[151329, 151336, 151338], # ) # gen_kwargs = {**input_ids, **generate_kwargs} # for entry in conversation: # print(f"Role: {entry['role']}, Content: {entry.get('content', '')}") # with torch.no_grad(): # generated_ids = model.generate(input_ids['input_ids'], **generate_kwargs) # generated_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True) # text_original = message['text'].strip() # generated_text_cleaned = generated_text.replace(text_original, "").strip() # print(" ") # print("---------") # print("Text: ") # print(" ") # print(generated_text_cleaned) # return PlainTextResponse(generated_text_cleaned) # except Exception as e: # return PlainTextResponse(f"Error: {str(e)}") @app.post("/chat/") async def test_endpoint( text: str = Form(...), file: UploadFile = File(None) ): # Verificar si se ha subido un archivo if file: # Leer el archivo en memoria file_content = BytesIO(await file.read()) file_name = file.filename # Construir el mensaje con el archivo y el texto message = { "text": text, "file_content": file_content, "file_name": file_name } else: # Si no se sube archivo, solo se incluye el texto message = { "text": text, "file_content": None, "file_name": None } # Llamar a la función `simple_chat` con el mensaje print(message) response = simple_chat(message) return response with gr.Blocks(css=CSS, theme="soft", fill_height=True) as demo: gr.HTML(TITLE) gr.HTML(DESCRIPTION) gr.ChatInterface( fn=stream_chat, multimodal=True, textbox=chat_input, chatbot=chatbot, fill_height=True, additional_inputs_accordion=gr.Accordion(label="⚙️ Parameters", open=False, render=False), additional_inputs=[ gr.Slider( minimum=0, maximum=1, step=0.1, value=0.8, label="Temperature", render=False, ), gr.Slider( minimum=1024, maximum=8192, step=1, value=4096, label="Max Length", render=False, ), gr.Slider( minimum=0.0, maximum=1.0, step=0.1, value=1.0, label="top_p", render=False, ), gr.Slider( minimum=1, maximum=20, step=1, value=10, label="top_k", render=False, ), gr.Slider( minimum=0.0, maximum=2.0, step=0.1, value=1.0, label="Repetition penalty", render=False, ), ], ), gr.Examples(EXAMPLES, [chat_input]) if __name__ == "__main__": app = gr.mount_gradio_app(app, demo, "/") uvicorn.run(app, host="0.0.0.0", port=7860) #app.mount("/static", StaticFiles(directory="static", html=True), name="static") # app = gr.mount_gradio_app(app, block, "/", gradio_api_url="http://localhost:7860/") # uvicorn.run(app, host="0.0.0.0", port=7860) demo.queue(api_open=False).launch(show_api=False, share=False, )#server_name="0.0.0.0", )