Daniel Foley commited on
Commit
5c86e88
·
1 Parent(s): 0716283

back to n workers set to 1. rate limiting

Browse files
Files changed (1) hide show
  1. RAG.py +6 -25
RAG.py CHANGED
@@ -14,8 +14,6 @@ import requests
14
  from typing import Dict, Any, Optional, List, Tuple
15
  import logging
16
  import concurrent.futures
17
- import json
18
- from threading import Lock
19
 
20
  def retrieve(query: str,vectorstore:PineconeVectorStore, k: int = 100) -> Tuple[List[Document], List[float]]:
21
  start = time.time()
@@ -86,20 +84,8 @@ def process_single_document(doc: Document) -> Optional[Document]:
86
  )
87
  return None
88
 
89
- # Global state to track alternating behavior
90
- _use_two_workers = False
91
- _worker_lock = Lock()
92
-
93
- def get_current_worker_count() -> int:
94
- """Thread-safe way to get and toggle the worker count between 1 and 2."""
95
- global _use_two_workers
96
- with _worker_lock:
97
- current_workers = 2 if _use_two_workers else 1
98
- _use_two_workers = not _use_two_workers # Toggle for next time
99
- return current_workers
100
-
101
- def rerank(documents: List[Document], query: str) -> List[Document]:
102
- """Ingest more metadata and rerank documents using BM25 with alternating worker counts."""
103
  start = time.time()
104
  if not documents:
105
  return []
@@ -107,12 +93,8 @@ def rerank(documents: List[Document], query: str) -> List[Document]:
107
  meta_start = time.time()
108
  full_docs = []
109
 
110
- # Get the worker count for this specific call
111
- worker_count = get_current_worker_count()
112
- logging.info(f"Processing with {worker_count} worker{'s' if worker_count > 1 else ''}")
113
-
114
  # Process documents in parallel using ThreadPoolExecutor
115
- with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as executor:
116
  # Submit all document processing tasks
117
  future_to_doc = {
118
  executor.submit(process_single_document, doc): doc
@@ -122,7 +104,7 @@ def rerank(documents: List[Document], query: str) -> List[Document]:
122
  # Collect results as they complete
123
  for future in concurrent.futures.as_completed(future_to_doc):
124
  processed_doc = future.result()
125
- if processed_doc:
126
  full_docs.append(processed_doc)
127
 
128
  logging.info(f"Took {time.time()-meta_start} seconds to retrieve all metadata")
@@ -135,8 +117,7 @@ def rerank(documents: List[Document], query: str) -> List[Document]:
135
  reranker = BM25Retriever.from_documents(full_docs, k=min(10, len(full_docs)))
136
  reranked_docs = reranker.invoke(query)
137
  logging.info(f"Finished reranking: {time.time()-start}")
138
- return reranked_docs
139
-
140
 
141
  def parse_xml_and_query(query:str,xml_string:str) -> str:
142
  """parse xml and return rephrased query"""
@@ -222,7 +203,7 @@ def RAG(llm: Any, query: str,vectorstore:PineconeVectorStore, top: int = 10, k:
222
  First, reason about the answer between <REASONING></REASONING> headers,
223
  based on the context determine if there is sufficient material for answering the exact question,
224
  return either <VALID>YES</VALID> or <VALID>NO</VALID>
225
- then return a response between <RESPONSE></RESPONSE> headers:
226
  Here is an example
227
  <EXAMPLE>
228
  <QUERY>Are pineapples a good fuel for cars?</QUERY>
 
14
  from typing import Dict, Any, Optional, List, Tuple
15
  import logging
16
  import concurrent.futures
 
 
17
 
18
  def retrieve(query: str,vectorstore:PineconeVectorStore, k: int = 100) -> Tuple[List[Document], List[float]]:
19
  start = time.time()
 
84
  )
85
  return None
86
 
87
+ def rerank(documents: List[Document], query: str, max_workers: int = 1) -> List[Document]:
88
+ """Ingest more metadata and rerank documents using BM25 with parallel processing."""
 
 
 
 
 
 
 
 
 
 
 
 
89
  start = time.time()
90
  if not documents:
91
  return []
 
93
  meta_start = time.time()
94
  full_docs = []
95
 
 
 
 
 
96
  # Process documents in parallel using ThreadPoolExecutor
97
+ with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
98
  # Submit all document processing tasks
99
  future_to_doc = {
100
  executor.submit(process_single_document, doc): doc
 
104
  # Collect results as they complete
105
  for future in concurrent.futures.as_completed(future_to_doc):
106
  processed_doc = future.result()
107
+ if processed_doc:extract_text_from_json():
108
  full_docs.append(processed_doc)
109
 
110
  logging.info(f"Took {time.time()-meta_start} seconds to retrieve all metadata")
 
117
  reranker = BM25Retriever.from_documents(full_docs, k=min(10, len(full_docs)))
118
  reranked_docs = reranker.invoke(query)
119
  logging.info(f"Finished reranking: {time.time()-start}")
120
+ return full_docs
 
121
 
122
  def parse_xml_and_query(query:str,xml_string:str) -> str:
123
  """parse xml and return rephrased query"""
 
203
  First, reason about the answer between <REASONING></REASONING> headers,
204
  based on the context determine if there is sufficient material for answering the exact question,
205
  return either <VALID>YES</VALID> or <VALID>NO</VALID>
206
+ then return a response between <RESPONSE></RESPONSE> headers, your response should be well formatted and an individual summary of each piece of relevant context:
207
  Here is an example
208
  <EXAMPLE>
209
  <QUERY>Are pineapples a good fuel for cars?</QUERY>