import shutil import os import gradio as gr import torch from uuid import uuid4 from huggingface_hub.file_download import http_get from langchain.document_loaders import ( CSVLoader, EverNoteLoader, PDFMinerLoader, TextLoader, UnstructuredEmailLoader, UnstructuredEPubLoader, UnstructuredHTMLLoader, UnstructuredMarkdownLoader, UnstructuredODTLoader, UnstructuredPowerPointLoader, UnstructuredWordDocumentLoader, ) from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.docstore.document import Document from sentence_transformers import SentenceTransformer from sentence_transformers.util import cos_sim from llama_cpp import Llama SYSTEM_PROMPT = "Welcome to the CNCF Bot" LOADER_MAPPING = { ".csv": (CSVLoader, {}), ".doc": (UnstructuredWordDocumentLoader, {}), ".docx": (UnstructuredWordDocumentLoader, {}), ".enex": (EverNoteLoader, {}), ".epub": (UnstructuredEPubLoader, {}), ".html": (UnstructuredHTMLLoader, {}), ".md": (UnstructuredMarkdownLoader, {}), ".odt": (UnstructuredODTLoader, {}), ".pdf": (PDFMinerLoader, {}), ".ppt": (UnstructuredPowerPointLoader, {}), ".pptx": (UnstructuredPowerPointLoader, {}), ".txt": (TextLoader, {"encoding": "utf8"}), } def load_model( directory: str = ".", model_name: str = "ggml-model-Q4_K_M.gguf", model_url: str = "https://huggingface.co/julioc-p/CNCF/blob/main/ggml-model-Q4_K_M.gguf" ): final_model_path = os.path.join(directory, model_name) print("Downloading all files...") if not os.path.exists(final_model_path): with open(final_model_path, "wb") as f: http_get(model_url, f) os.chmod(final_model_path, 0o777) print("Files downloaded!") model = Llama( model_path=final_model_path, n_ctx=2000, n_parts=1, ) print("Model loaded!") return model EMBEDDER = SentenceTransformer("sentence-transformers/paraphrase-multilingual-mpnet-base-v2") MODEL = load_model() def get_uuid(): return str(uuid4()) def load_single_document(file_path: str) -> Document: ext = "." + file_path.rsplit(".", 1)[-1] assert ext in LOADER_MAPPING loader_class, loader_args = LOADER_MAPPING[ext] loader = loader_class(file_path, **loader_args) return loader.load()[0] def get_message_tokens(model, role, content): content = f"{role}\n{content}\n" content = content.encode("utf-8") return model.tokenize(content, special=True) def get_system_tokens(model): system_message = {"role": "system", "content": SYSTEM_PROMPT} return get_message_tokens(model, **system_message) def process_text(text): lines = text.split("\n") lines = [line for line in lines if len(line.strip()) > 2] text = "\n".join(lines).strip() if len(text) < 10: return None return text def upload_files(files, file_paths): file_paths = [f.name for f in files] return file_paths def build_index(file_paths, db, chunk_size, chunk_overlap, file_warning): documents = [load_single_document(path) for path in file_paths] text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) documents = text_splitter.split_documents(documents) print("Documents after split:", len(documents)) fixed_documents = [] for doc in documents: doc.page_content = process_text(doc.page_content) if not doc.page_content: continue fixed_documents.append(doc) print("Documents after processing:", len(fixed_documents)) texts = [doc.page_content for doc in fixed_documents] embeddings = EMBEDDER.encode(texts, convert_to_tensor=True) db = {"docs": texts, "embeddings": embeddings} print("Embeddings calculated!") file_warning = f"Загружено {len(fixed_documents)} фрагментов! Можно задавать вопросы." return db, file_warning def retrieve(history, db, retrieved_docs, k_documents): retrieved_docs = "" if db: last_user_message = history[-1][0] query_embedding = EMBEDDER.encode(last_user_message, convert_to_tensor=True) scores = cos_sim(query_embedding, db["embeddings"])[0] top_k_idx = torch.topk(scores, k=k_documents)[1] top_k_documents = [db["docs"][idx] for idx in top_k_idx] retrieved_docs = "\n\n".join(top_k_documents) return retrieved_docs def user(message, history, system_prompt): new_history = history + [[message, None]] return "", new_history def bot( history, system_prompt, conversation_id, retrieved_docs, top_p, top_k, temp ): model = MODEL if not history: return tokens = get_system_tokens(model)[:] for user_message, bot_message in history[:-1]: message_tokens = get_message_tokens(model=model, role="user", content=user_message) tokens.extend(message_tokens) if bot_message: message_tokens = get_message_tokens(model=model, role="bot", content=bot_message) tokens.extend(message_tokens) last_user_message = history[-1][0] if retrieved_docs: last_user_message = f"Контекст: {retrieved_docs}\n\nИспользуя контекст, ответь на вопрос: {last_user_message}" message_tokens = get_message_tokens(model=model, role="user", content=last_user_message) tokens.extend(message_tokens) role_tokens = model.tokenize("bot\n".encode("utf-8"), special=True) tokens.extend(role_tokens) generator = model.generate( tokens, top_k=top_k, top_p=top_p, temp=temp ) partial_text = "" for i, token in enumerate(generator): if token == model.token_eos(): break partial_text += model.detokenize([token]).decode("utf-8", "ignore") history[-1][1] = partial_text yield history with gr.Blocks( theme=gr.themes.Soft() ) as demo: db = gr.State(None) conversation_id = gr.State(get_uuid) favicon = '' gr.Markdown( f"""