File size: 4,210 Bytes
8d0f9c9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
863ffbb
 
 
8d0f9c9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb04cbe
 
8d0f9c9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import gradio as gr
import spaces
import subprocess
import os
import shutil
import string
import random
import glob
from pypdf import PdfReader
from sentence_transformers import SentenceTransformer

model_name = os.environ.get("MODEL", "Snowflake/snowflake-arctic-embed-m")
chunk_size = int(os.environ.get("CHUNK_SIZE", 128))
default_max_characters = int(os.environ.get("DEFAULT_MAX_CHARACTERS", 258))

model = SentenceTransformer(model_name)
# model.to(device="cuda")

@spaces.GPU
def embed(queries, chunks) -> dict[str, list[tuple[str, float]]]:
    query_embeddings = model.encode(queries, prompt_name="query")
    document_embeddings = model.encode(chunks)

    scores = query_embeddings @ document_embeddings.T
    results = {}
    for query, query_scores in zip(queries, scores):
        chunk_idxs = [i for i in range(len(chunks))]
        # Get a structure like {query: [(chunk_idx, score), (chunk_idx, score), ...]}
        results[query] = list(zip(chunk_idxs, query_scores))

    return results


def extract_text_from_pdf(reader):
    full_text = ""
    for idx, page in enumerate(reader.pages):
        text = page.extract_text()
        if len(text) > 0:
            full_text += f"---- Page {idx} ----\n" + page.extract_text() + "\n\n"

    return full_text.strip()

def convert(filename) -> str:
    plain_text_filetypes = [
        ".txt",
        ".csv",
        ".tsv",
        ".md",
        ".yaml",
        ".toml",
        ".json",
        ".json5",
        ".jsonc",
    ]
    # Already a plain text file that wouldn't benefit from pandoc so return the content
    if any(filename.endswith(ft) for ft in plain_text_filetypes):
        with open(filename, "r") as f:
            return f.read()

    if filename.endswith(".pdf"):
        return extract_text_from_pdf(PdfReader(filename))

    raise ValueError(f"Unsupported file type: {filename}")


def chunk_to_length(text, max_length=512):
    chunks = []
    while len(text) > max_length:
        chunks.append(text[:max_length])
        text = text[max_length:]
    chunks.append(text)
    return chunks

@spaces.GPU
def predict(query, max_characters) -> str:
    # Embed the query
    query_embedding = model.encode(query, prompt_name="query")

    # Initialize a list to store all chunks and their similarities across all documents
    all_chunks = []

    # Iterate through all documents
    for filename, doc in docs.items():
        # Calculate dot product between query and document embeddings
        similarities = doc["embeddings"] @ query_embedding.T
        
        # Add chunks and similarities to the all_chunks list
        all_chunks.extend([(filename, chunk, sim) for chunk, sim in zip(doc["chunks"], similarities)])

    # Sort all chunks by similarity
    all_chunks.sort(key=lambda x: x[2], reverse=True)

    # Initialize a dictionary to store relevant chunks for each document
    relevant_chunks = {}

    # Add most relevant chunks until max_characters is reached
    total_chars = 0
    for filename, chunk, _ in all_chunks:
        if total_chars + len(chunk) <= max_characters:
            if filename not in relevant_chunks:
                relevant_chunks[filename] = []
            relevant_chunks[filename].append(chunk)
            total_chars += len(chunk)
        else:
            break

    return relevant_chunks



docs = {}

for filename in glob.glob("sources/*"):
    if filename.endswith("add_your_files_here"):
        continue

    converted_doc = convert(filename)

    chunks = chunk_to_length(converted_doc, chunk_size)
    embeddings = model.encode(chunks)

    docs[filename] = {
        "chunks": chunks,
        "embeddings": embeddings,
    }


gr.Interface(
    predict,
    inputs=[
        gr.Textbox(label="Query asked about the documents"),
        gr.Number(label="Max output characters", value=default_max_characters),
    ],
    outputs=[gr.JSON(label="Relevant chunks")],
    title="RAG Community Tool Template demo",
    description="This is a demo of the RAG Community Tool Template. To use RAG in HuggingChat with your own documents, start by cloning this space, add your documents to the `sources` folder, and then create a community tool with this space!",
).launch()