Spaces:
Running
on
T4
Running
on
T4
File size: 13,913 Bytes
cfbac61 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 |
# Requirements
# scikit-learn umap-learn
from itertools import chain
from typing import List, Dict
from App_Function_Libraries.RAG.ChromaDB_Library import store_in_chroma, create_embedding, vector_search, chroma_client
from App_Function_Libraries.Chunk_Lib import improved_chunking_process, recursive_summarize_chunks
import logging
from sklearn.mixture import GaussianMixture
import umap
from nltk.corpus import wordnet
# Logging setup
logging.basicConfig(filename='raptor.log', level=logging.DEBUG)
# FIXME
MAX_LEVELS = 3
def log_and_summarize(text, prompt):
logging.debug(f"Summarizing text: {text[:100]} with prompt: {prompt}")
return dummy_summarize(text, prompt)
# 1. Data Preparation
def prepare_data(content: str, media_id: int, chunk_options: dict):
chunks = improved_chunking_process(content, chunk_options)
embeddings = [create_embedding(chunk['text']) for chunk in chunks]
return chunks, embeddings
# 2. Recursive Summarization
def recursive_summarization(chunks, summarize_func, custom_prompt):
summarized_chunks = recursive_summarize_chunks(
[chunk['text'] for chunk in chunks],
summarize_func=summarize_func,
custom_prompt=custom_prompt
)
return summarized_chunks
# Initial gen
# 3. Tree Organization
#def build_tree_structure(chunks, embeddings, collection_name, level=0):
# if len(chunks) <= 1:
# return chunks # Base case: if chunks are small enough, return as is
# Recursive case: cluster and summarize
# summarized_chunks = recursive_summarization(chunks, summarize_func=dummy_summarize, custom_prompt="Summarize:")
# new_chunks, new_embeddings = prepare_data(' '.join(summarized_chunks), media_id, chunk_options)
# Store in ChromaDB
# ids = [f"{media_id}_L{level}_chunk_{i}" for i in range(len(new_chunks))]
# store_in_chroma(collection_name, [chunk['text'] for chunk in new_chunks], new_embeddings, ids)
# Recursively build tree
# return build_tree_structure(new_chunks, new_embeddings, collection_name, level+1)
# Second iteration
def build_tree_structure(chunks, collection_name, level=0):
# Dynamic clustering
clustered_texts = dynamic_clustering([chunk['text'] for chunk in chunks])
# Summarize each cluster
summarized_clusters = {}
for cluster_id, cluster_texts in clustered_texts.items():
summary = dummy_summarize(' '.join(cluster_texts), custom_prompt="Summarize:")
summarized_clusters[cluster_id] = summary
# Store summaries at current level
ids = []
embeddings = []
summaries = []
for cluster_id, summary in summarized_clusters.items():
ids.append(f"{collection_name}_L{level}_C{cluster_id}")
embeddings.append(create_embedding(summary))
summaries.append(summary)
store_in_chroma(collection_name, summaries, embeddings, ids)
# Recursively build tree structure if necessary
if level < MAX_LEVELS:
for cluster_id, cluster_texts in clustered_texts.items():
build_tree_structure(cluster_texts, collection_name, level + 1)
# Dummy summarize function (replace with actual summarization)
def dummy_summarize(text, custom_prompt, temp=None, system_prompt=None):
return text # Replace this with actual call to summarization model (like GPT-3.5-turbo)
# 4. Retrieval
def raptor_retrieve(query, collection_name, level=0):
results = vector_search(collection_name, query, k=5)
return results
# Main function integrating RAPTOR
def raptor_pipeline(media_id, content, chunk_options):
collection_name = f"media_{media_id}_raptor"
# Step 1: Prepare Data
chunks, embeddings = prepare_data(content, media_id, chunk_options)
# Step 2: Build Tree
build_tree_structure(chunks, embeddings, collection_name)
# Step 3: Retrieve Information
query = "Your query here"
result = raptor_retrieve(query, collection_name)
print(result)
# Example usage
content = "Your long document content here"
chunk_options = {
'method': 'sentences',
'max_size': 300,
'overlap': 50
}
media_id = 1
raptor_pipeline(media_id, content, chunk_options)
#
#
###################################################################################################################
#
# Additions:
def dynamic_clustering(texts, n_components=2):
# Step 1: Convert text to embeddings
embeddings = [create_embedding(text) for text in texts]
# Step 2: Dimensionality reduction (UMAP)
reducer = umap.UMAP(n_components=n_components)
reduced_embeddings = reducer.fit_transform(embeddings)
# Step 3: Find optimal number of clusters using BIC
best_gmm = None
best_bic = float('inf')
n_clusters = range(2, 10)
for n in n_clusters:
gmm = GaussianMixture(n_components=n, covariance_type='full')
gmm.fit(reduced_embeddings)
bic = gmm.bic(reduced_embeddings)
if bic < best_bic:
best_bic = bic
best_gmm = gmm
# Step 4: Cluster the reduced embeddings
cluster_labels = best_gmm.predict(reduced_embeddings)
clustered_texts = {i: [] for i in range(best_gmm.n_components)}
for label, text in zip(cluster_labels, texts):
clustered_texts[label].append(text)
return clustered_texts
def tree_traversal_retrieve(query, collection_name, max_depth=3):
logging.info(f"Starting tree traversal for query: {query}")
results = []
current_level = 0
current_nodes = [collection_name + '_L0']
while current_level <= max_depth and current_nodes:
next_level_nodes = []
for node_id in current_nodes:
documents = vector_search(node_id, query, k=5)
results.extend(documents)
next_level_nodes.extend([doc['id'] for doc in documents]) # Assuming your doc structure includes an 'id' field
current_nodes = next_level_nodes
current_level += 1
logging.info(f"Tree traversal completed with {len(results)} results")
return results
def collapsed_tree_retrieve(query, collection_name):
all_layers = [f"{collection_name}_L{level}" for level in range(MAX_LEVELS)]
all_results = []
for layer in all_layers:
all_results.extend(vector_search(layer, query, k=5))
# Sort and rank results by relevance
sorted_results = sorted(all_results, key=lambda x: x['relevance'], reverse=True) # Assuming 'relevance' is a key
return sorted_results[:5] # Return top 5 results
# Test collaped tree retrieval
query = "Your broad query here"
results = collapsed_tree_retrieve(query, collection_name=f"media_{media_id}_raptor")
print(results)
# Parallel processing
# pip install joblib
from joblib import Parallel, delayed
def parallel_process_chunks(chunks):
return Parallel(n_jobs=-1)(delayed(create_embedding)(chunk['text']) for chunk in chunks)
def build_tree_structure(chunks, collection_name, level=0):
clustered_texts = dynamic_clustering([chunk['text'] for chunk in chunks])
summarized_clusters = {}
for cluster_id, cluster_texts in clustered_texts.items():
summary = dummy_summarize(' '.join(cluster_texts), custom_prompt="Summarize:")
summarized_clusters[cluster_id] = summary
# Parallel processing of embeddings
embeddings = parallel_process_chunks([{'text': summary} for summary in summarized_clusters.values()])
ids = [f"{collection_name}_L{level}_C{cluster_id}" for cluster_id in summarized_clusters.keys()]
store_in_chroma(collection_name, list(summarized_clusters.values()), embeddings, ids)
if len(summarized_clusters) > 1 and level < MAX_LEVELS:
build_tree_structure(summarized_clusters.values(), collection_name, level + 1)
# Asynchronous processing
import asyncio
async def async_create_embedding(text):
return create_embedding(text) # Assuming create_embedding is now async
async def build_tree_structure_async(chunks, collection_name, level=0):
clustered_texts = dynamic_clustering([chunk['text'] for chunk in chunks])
summarized_clusters = {}
for cluster_id, cluster_texts in clustered_texts.items():
summary = await async_create_embedding(' '.join(cluster_texts))
summarized_clusters[cluster_id] = summary
embeddings = await asyncio.gather(*[async_create_embedding(summary) for summary in summarized_clusters.values()])
ids = [f"{collection_name}_L{level}_C{cluster_id}" for cluster_id in summarized_clusters.keys()]
store_in_chroma(collection_name, list(summarized_clusters.values()), embeddings, ids)
if len(summarized_clusters) > 1 and level < MAX_LEVELS:
await build_tree_structure_async(summarized_clusters.values(), collection_name, level + 1)
# User feedback Loop
def get_user_feedback(results):
print("Please review the following results:")
for i, result in enumerate(results):
print(f"{i + 1}: {result['text'][:100]}...")
feedback = input("Enter the numbers of the results that were relevant (comma-separated): ")
relevant_indices = [int(i.strip()) - 1 for i in feedback.split(",")]
return relevant_indices
def raptor_pipeline_with_feedback(media_id, content, chunk_options):
# ... Existing pipeline steps ...
query = "Your query here"
initial_results = tree_traversal_retrieve(query, collection_name=f"media_{media_id}_raptor")
relevant_indices = get_user_feedback(initial_results)
if relevant_indices:
relevant_results = [initial_results[i] for i in relevant_indices]
refined_query = " ".join([res['text'] for res in relevant_results])
try:
final_results = tree_traversal_retrieve(refined_query, collection_name=f"media_{media_id}_raptor")
except Exception as e:
logging.error(f"Error during retrieval: {str(e)}")
raise
print("Refined Results:", final_results)
else:
print("No relevant results were found in the initial search.")
def identify_uncertain_results(results):
threshold = 0.5 # Define a confidence threshold
uncertain_results = [res for res in results if res['confidence'] < threshold]
return uncertain_results
def raptor_pipeline_with_active_learning(media_id, content, chunk_options):
# ... Existing pipeline steps ...
query = "Your query here"
initial_results = tree_traversal_retrieve(query, collection_name=f"media_{media_id}_raptor")
uncertain_results = identify_uncertain_results(initial_results)
if uncertain_results:
print("The following results are uncertain. Please provide feedback:")
feedback_indices = get_user_feedback(uncertain_results)
# Use feedback to adjust retrieval or refine the query
refined_query = " ".join([uncertain_results[i]['text'] for i in feedback_indices])
final_results = tree_traversal_retrieve(refined_query, collection_name=f"media_{media_id}_raptor")
print("Refined Results:", final_results)
else:
print("No uncertain results were found.")
# Query Expansion
def expand_query_with_synonyms(query):
words = query.split()
expanded_query = []
for word in words:
synonyms = wordnet.synsets(word)
lemmas = set(chain.from_iterable([syn.lemma_names() for syn in synonyms]))
expanded_query.append(" ".join(lemmas))
return " ".join(expanded_query)
def contextual_query_expansion(query, context):
# FIXME: Replace with actual contextual model
expanded_terms = some_contextual_model.get_expansions(query, context)
return query + " " + " ".join(expanded_terms)
def raptor_pipeline_with_query_expansion(media_id, content, chunk_options):
# ... Existing pipeline steps ...
query = "Your initial query"
expanded_query = expand_query_with_synonyms(query)
initial_results = tree_traversal_retrieve(expanded_query, collection_name=f"media_{media_id}_raptor")
# ... Continue with feedback loop ...
def generate_summary_with_citations(query: str, collection_name: str):
results = vector_search_with_citation(collection_name, query)
# FIXME
summary = summarize([res['text'] for res in results])
# Deduplicate sources
sources = list(set(res['source'] for res in results))
return f"{summary}\n\nCitations:\n" + "\n".join(sources)
def vector_search_with_citation(collection_name: str, query: str, k: int = 10) -> List[Dict[str, str]]:
query_embedding = create_embedding(query)
collection = chroma_client.get_collection(name=collection_name)
results = collection.query(
query_embeddings=[query_embedding],
n_results=k
)
return [{'text': doc, 'source': meta['source']} for doc, meta in zip(results['documents'], results['metadatas'])]
def generate_summary_with_footnotes(query: str, collection_name: str):
results = vector_search_with_citation(collection_name, query)
summary_parts = []
citations = []
for i, res in enumerate(results):
summary_parts.append(f"{res['text']} [{i + 1}]")
citations.append(f"[{i + 1}] {res['source']}")
return " ".join(summary_parts) + "\n\nFootnotes:\n" + "\n".join(citations)
def generate_summary_with_hyperlinks(query: str, collection_name: str):
results = vector_search_with_citation(collection_name, query)
summary_parts = []
for res in results:
summary_parts.append(f'<a href="{res["source"]}">{res["text"][:100]}...</a>')
return " ".join(summary_parts)
#
# End of Additions
############################################3############################################3############################## |