adowu commited on
Commit
b380300
verified
1 Parent(s): 150d1ad

Update database.py

Browse files
Files changed (1) hide show
  1. database.py +39 -107
database.py CHANGED
@@ -6,89 +6,43 @@ import chromadb
6
  from chromadb.utils import embedding_functions
7
  from config import EMBEDDING_MODEL, DATABASE_DIR
8
 
9
- # Konfiguracja logowania
10
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
11
 
12
  class KodeksProcessor:
13
  def __init__(self):
14
- logging.info(f"Inicjalizacja klienta bazy danych w katalogu: {DATABASE_DIR}")
15
  if not os.path.exists(DATABASE_DIR):
16
  os.makedirs(DATABASE_DIR)
17
- logging.info(f"Utworzono katalog {DATABASE_DIR}")
18
 
19
  self.client = chromadb.PersistentClient(path=DATABASE_DIR)
20
- logging.info("Klient bazy danych zainicjalizowany")
21
 
22
  try:
23
- self.collection = self.client.get_collection("kodeksy")
24
- logging.info("Pobrano istniej膮c膮 kolekcj臋 'kodeksy'")
25
- except Exception as e:
26
- logging.error(f"B艂膮d podczas pobierania kolekcji: {e}")
27
- logging.info("Pr贸ba utworzenia nowej kolekcji 'kodeksy'")
28
- try:
29
- self.collection = self.client.create_collection(
30
- name="kodeksy",
31
- embedding_function=embedding_functions.SentenceTransformerEmbeddingFunction(
32
- model_name=EMBEDDING_MODEL
33
- )
34
  )
35
- logging.info("Utworzono now膮 kolekcj臋 'kodeksy'")
36
- except Exception as e:
37
- logging.error(f"B艂膮d podczas tworzenia kolekcji: {e}")
38
- raise
 
39
 
40
  def extract_metadata(self, text: str) -> Dict:
41
  metadata = {}
42
- dz_u_match = re.search(r'Dz\.U\.(\d{4})\.(\d+)\.(\d+)', text)
43
- if dz_u_match:
44
- metadata['dz_u'] = f"Dz.U.{dz_u_match.group(1)}.{dz_u_match.group(2)}.{dz_u_match.group(3)}"
45
- metadata['rok'] = dz_u_match.group(1)
46
-
47
- nazwa_match = re.search(r'USTAWA\s+z dnia(.*?)\n(.*?)\n', text)
48
- if nazwa_match:
49
- metadata['data_ustawy'] = nazwa_match.group(1).strip()
50
- metadata['nazwa'] = nazwa_match.group(2).strip()
51
-
52
- # Dodanie przetwarzania historii zmian
53
- zmiany = re.findall(r'(\d{4}-\d{2}-\d{2})\s+(zm\.\s+DZ\.U\.(\d{4})\.(\d+)\.(\d+)\s+art\.\s+(\d+)(?:\s+搂\s+(\d+))?)', text)
54
- if zmiany:
55
- metadata['historia_zmian'] = [
56
- {
57
- 'data': data,
58
- 'dz_u': f"Dz.U.{rok}.{numer}.{pozycja}",
59
- 'artykul': artykul,
60
- 'paragraf': paragraf if paragraf else None
61
- }
62
- for data, _, rok, numer, pozycja, artykul, paragraf in zmiany
63
- ]
64
-
65
- logging.info("Wydobyto metadane: %s", metadata)
66
  return metadata
67
 
68
  def split_header_and_content(self, text: str) -> Tuple[str, str]:
69
- parts = text.split("USTAWA", 1)
70
- if len(parts) > 1:
71
- return parts[0], "USTAWA" + parts[1]
72
- return "", text
73
 
74
  def process_article(self, article_text: str) -> Dict:
75
- art_num_match = re.match(r'Art\.\s*(\d+[a-z]?)', article_text)
76
- article_num = art_num_match.group(1) if art_num_match else ""
77
-
78
- paragraphs = re.findall(r'搂\s*(\d+)\.\s*(.*?)(?=搂\s*\d+|Art\.\s*\d+|$)', article_text, re.DOTALL)
79
-
80
- if not paragraphs:
81
- return {
82
- "article_num": article_num,
83
- "content": article_text.strip(),
84
- "has_paragraphs": False
85
- }
86
-
87
- return {
88
- "article_num": article_num,
89
- "paragraphs": paragraphs,
90
- "has_paragraphs": True
91
- }
92
 
93
  def split_into_chunks(self, text: str, metadata: Dict) -> List[Dict]:
94
  chunks = []
@@ -117,17 +71,17 @@ class KodeksProcessor:
117
  "metadata": chunk_metadata
118
  })
119
 
120
- logging.info("Podzielono tekst na %d chunk贸w.", len(chunks))
121
  return chunks
122
 
123
  def process_file(self, filepath: str) -> None:
124
- logging.info("Przetwarzanie pliku: %s", filepath)
125
 
126
  try:
127
  with open(filepath, 'r', encoding='utf-8') as file:
128
  content = file.read()
129
  except Exception as e:
130
- logging.error(f"B艂膮d podczas odczytu pliku {filepath}: {e}")
131
  return
132
 
133
  header, main_content = self.split_header_and_content(content)
@@ -138,40 +92,36 @@ class KodeksProcessor:
138
 
139
  if chunks:
140
  try:
141
- for i, chunk in enumerate(chunks):
142
- self.collection.add(
143
- documents=[chunk["text"]],
144
- metadatas=[chunk["metadata"]],
145
- ids=[f"{metadata['filename']}_{chunk['metadata']['article']}_{i}"]
146
- )
147
- logging.info(f"Dodano chunk: {chunk['text'][:100]}...") # Logowanie pierwszych 100 znak贸w chunka
148
  except Exception as e:
149
- logging.error(f"B艂膮d podczas dodawania chunk贸w do kolekcji: {e}")
150
  else:
151
- logging.warning(f"Brak chunk贸w do dodania z pliku: {filepath}")
152
-
153
- logging.info("Dodano %d chunk贸w z pliku %s", len(chunks), metadata['filename'])
154
 
155
  def process_all_files(self, directory: str) -> None:
156
- logging.info("Rozpocz臋cie przetwarzania wszystkich plik贸w w katalogu: %s", directory)
157
  for filename in os.listdir(directory):
158
  if filename.endswith('.txt'):
159
  filepath = os.path.join(directory, filename)
160
  self.process_file(filepath)
161
- logging.info("Zako艅czono przetwarzanie plik贸w.")
162
 
163
- def search(self, query: str, n_results: int = 3, filters: Dict = None) -> Dict:
164
- logging.info("Wyszukiwanie w bazie danych dla zapytania: %s", query)
165
  try:
166
  results = self.collection.query(
167
  query_texts=[query],
168
- n_results=n_results,
169
- where=filters
170
  )
171
- logging.info("Znaleziono %d wynik贸w dla zapytania: %s", len(results['documents'][0]), query)
172
  return results
173
  except Exception as e:
174
- logging.error(f"B艂膮d podczas wyszukiwania: {e}")
175
  return {"documents": [[]], "metadatas": [[]], "distances": [[]]}
176
 
177
  def list_all_documents(self) -> None:
@@ -179,29 +129,11 @@ class KodeksProcessor:
179
  all_docs = self.collection.get(include=['metadatas'])
180
  if all_docs['metadatas']:
181
  for metadata in all_docs['metadatas']:
182
- logging.info("Dokument: %s", metadata)
183
  else:
184
- logging.info("Brak dokument贸w w bazie.")
185
- except Exception as e:
186
- logging.error(f"B艂膮d podczas listowania dokument贸w: {e}")
187
-
188
- def update_document(self, id: str, new_text: str, new_metadata: Dict) -> None:
189
- try:
190
- self.collection.update(
191
- ids=[id],
192
- documents=[new_text],
193
- metadatas=[new_metadata]
194
- )
195
- logging.info(f"Zaktualizowano dokument o id: {id}")
196
- except Exception as e:
197
- logging.error(f"B艂膮d podczas aktualizacji dokumentu {id}: {e}")
198
-
199
- def delete_document(self, id: str) -> None:
200
- try:
201
- self.collection.delete(ids=[id])
202
- logging.info(f"Usuni臋to dokument o id: {id}")
203
  except Exception as e:
204
- logging.error(f"B艂膮d podczas usuwania dokumentu {id}: {e}")
205
 
206
  if __name__ == "__main__":
207
  processor = KodeksProcessor()
 
6
  from chromadb.utils import embedding_functions
7
  from config import EMBEDDING_MODEL, DATABASE_DIR
8
 
9
+ # Improved logging configuration
10
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
11
+ logger = logging.getLogger(__name__)
12
 
13
  class KodeksProcessor:
14
  def __init__(self):
15
+ logger.info(f"Initializing database client in directory: {DATABASE_DIR}")
16
  if not os.path.exists(DATABASE_DIR):
17
  os.makedirs(DATABASE_DIR)
18
+ logger.info(f"Created directory {DATABASE_DIR}")
19
 
20
  self.client = chromadb.PersistentClient(path=DATABASE_DIR)
21
+ logger.info("Database client initialized")
22
 
23
  try:
24
+ self.collection = self.client.get_or_create_collection(
25
+ name="kodeksy",
26
+ embedding_function=embedding_functions.SentenceTransformerEmbeddingFunction(
27
+ model_name=EMBEDDING_MODEL
 
 
 
 
 
 
 
28
  )
29
+ )
30
+ logger.info("Collection 'kodeksy' retrieved or created")
31
+ except Exception as e:
32
+ logger.error(f"Error while getting or creating collection: {e}")
33
+ raise
34
 
35
  def extract_metadata(self, text: str) -> Dict:
36
  metadata = {}
37
+ # ... (rest of the method remains the same)
38
+ logger.info("Extracted metadata: %s", metadata)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
  return metadata
40
 
41
  def split_header_and_content(self, text: str) -> Tuple[str, str]:
42
+ # ... (method remains the same)
 
 
 
43
 
44
  def process_article(self, article_text: str) -> Dict:
45
+ # ... (method remains the same)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
 
47
  def split_into_chunks(self, text: str, metadata: Dict) -> List[Dict]:
48
  chunks = []
 
71
  "metadata": chunk_metadata
72
  })
73
 
74
+ logger.info("Split text into %d chunks.", len(chunks))
75
  return chunks
76
 
77
  def process_file(self, filepath: str) -> None:
78
+ logger.info("Processing file: %s", filepath)
79
 
80
  try:
81
  with open(filepath, 'r', encoding='utf-8') as file:
82
  content = file.read()
83
  except Exception as e:
84
+ logger.error(f"Error reading file {filepath}: {e}")
85
  return
86
 
87
  header, main_content = self.split_header_and_content(content)
 
92
 
93
  if chunks:
94
  try:
95
+ self.collection.add(
96
+ documents=[chunk["text"] for chunk in chunks],
97
+ metadatas=[chunk["metadata"] for chunk in chunks],
98
+ ids=[f"{metadata['filename']}_{chunk['metadata']['article']}_{i}" for i, chunk in enumerate(chunks)]
99
+ )
100
+ logger.info(f"Added {len(chunks)} chunks from file {metadata['filename']}")
 
101
  except Exception as e:
102
+ logger.error(f"Error adding chunks to collection: {e}")
103
  else:
104
+ logger.warning(f"No chunks to add from file: {filepath}")
 
 
105
 
106
  def process_all_files(self, directory: str) -> None:
107
+ logger.info("Starting to process all files in directory: %s", directory)
108
  for filename in os.listdir(directory):
109
  if filename.endswith('.txt'):
110
  filepath = os.path.join(directory, filename)
111
  self.process_file(filepath)
112
+ logger.info("Finished processing files.")
113
 
114
+ def search(self, query: str, n_results: int = 3) -> Dict:
115
+ logger.info("Searching database for query: %s", query)
116
  try:
117
  results = self.collection.query(
118
  query_texts=[query],
119
+ n_results=n_results
 
120
  )
121
+ logger.info("Found %d results for query: %s", len(results['documents'][0]), query)
122
  return results
123
  except Exception as e:
124
+ logger.error(f"Error during search: {e}")
125
  return {"documents": [[]], "metadatas": [[]], "distances": [[]]}
126
 
127
  def list_all_documents(self) -> None:
 
129
  all_docs = self.collection.get(include=['metadatas'])
130
  if all_docs['metadatas']:
131
  for metadata in all_docs['metadatas']:
132
+ logger.info("Document: %s", metadata)
133
  else:
134
+ logger.info("No documents in the database.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
135
  except Exception as e:
136
+ logger.error(f"Error listing documents: {e}")
137
 
138
  if __name__ == "__main__":
139
  processor = KodeksProcessor()