import abc import json import os from typing import Any, Generic, List, Optional, Tuple, TypeVar import numpy as np from qdrant_client import QdrantClient from qdrant_client.models import Distance, PointStruct, VectorParams T = TypeVar("T", bound=dict[str, Any]) E = TypeVar("E") class VectorStore(Generic[T, E], abc.ABC): """Abstract base class for a vector store. A vector store is a key-value store that maps an ID to a vector embedding and a payload. The payload can be any JSON-serializable object, e.g. a dictionary. """ INDEX_FILE = "vectors_index.json" EMBEDDINGS_FILE = "vectors_data.npy" PAYLOADS_FILE = "vectors_payloads.json" @abc.abstractmethod def _add(self, embedding: E, payload: T, emb_id: str) -> None: """Save an embedding with payload for a given ID.""" pass @abc.abstractmethod def _get(self, emb_id: str) -> Optional[E]: """Get the embedding for a given ID.""" pass @abc.abstractmethod def clear(self) -> None: """Clear the store.""" pass def _get_emb_id(self, emb_id: Optional[str] = None, payload: Optional[T] = None) -> str: if emb_id is None: if payload is None: raise ValueError("Either emb_id or payload must be provided.") emb_id = json.dumps(payload, sort_keys=True) return emb_id def add(self, embedding: E, payload: T, emb_id: Optional[str] = None) -> None: if emb_id is None: emb_id = json.dumps(payload, sort_keys=True) self._add(embedding=embedding, payload=payload, emb_id=emb_id) def get(self, emb_id: Optional[str] = None, payload: Optional[T] = None) -> Optional[E]: return self._get(emb_id=self._get_emb_id(emb_id=emb_id, payload=payload)) def has(self, emb_id: Optional[str] = None, payload: Optional[T] = None) -> bool: return self.get(emb_id=emb_id, payload=payload) is not None @abc.abstractmethod def _retrieve_similar( self, ref_id: str, top_k: Optional[int] = None, min_similarity: Optional[float] = None ) -> List[Tuple[T, float]]: """Retrieve IDs, payloads and the respective similarity scores with respect to the reference entry. In the case that the reference entry is not in the store itself, an empty list will be returned. Args: ref_id: The ID of the reference entry. top_k: If provided, only the top-k most similar entries will be returned. min_similarity: If provided, only entries with a similarity score greater or equal to this value will be returned. Returns: A list of tuples consisting of the ID and the similarity score, sorted by similarity score in descending order. """ pass def retrieve_similar( self, ref_id: Optional[str] = None, ref_payload: Optional[T] = None, **kwargs ) -> List[Tuple[T, float]]: if not self.has(emb_id=ref_id, payload=ref_payload): return [] return self._retrieve_similar( ref_id=self._get_emb_id(emb_id=ref_id, payload=ref_payload), **kwargs ) @abc.abstractmethod def __len__(self): pass def _add_from_directory(self, directory: str) -> None: with open(os.path.join(directory, self.INDEX_FILE), "r") as f: index = json.load(f) embeddings_np = np.load(os.path.join(directory, self.EMBEDDINGS_FILE)) with open(os.path.join(directory, self.PAYLOADS_FILE), "r") as f: payloads = json.load(f) for emb_id, emb, payload in zip(index, embeddings_np, payloads): self._add(emb_id=emb_id, payload=payload, embedding=emb.tolist()) @abc.abstractmethod def as_indices_vectors_payloads(self) -> Tuple[List[str], np.ndarray, List[T]]: """Return a tuple of indices, vectors and payloads.""" pass def _save_to_directory(self, directory: str) -> None: indices, vectors, payloads = self.as_indices_vectors_payloads() np.save(os.path.join(directory, self.EMBEDDINGS_FILE), vectors) with open(os.path.join(directory, self.PAYLOADS_FILE), "w") as f: json.dump(payloads, f) with open(os.path.join(directory, self.INDEX_FILE), "w") as f: json.dump(indices, f) def save_to_directory(self, directory: str) -> None: """Save the vector store to a directory.""" os.makedirs(directory, exist_ok=True) self._save_to_directory(directory) def load_from_directory(self, directory: str, replace: bool = False) -> None: """Load the vector store from a directory. If `replace` is True, the current content of the store will be replaced. """ if replace: self.clear() self._add_from_directory(directory) def vector_norm(vector: List[float]) -> float: return sum(x**2 for x in vector) ** 0.5 def cosine_similarity(a: List[float], b: List[float]) -> float: return sum(a * b for a, b in zip(a, b)) / (vector_norm(a) * vector_norm(b)) class SimpleVectorStore(VectorStore[T, List[float]]): """Simple in-memory vector store using a dictionary.""" def __init__(self): self.vectors: dict[str, List[float]] = {} self.payloads: dict[str, T] = {} self._cache = {} self._sim = cosine_similarity def _add(self, embedding: E, payload: T, emb_id: str) -> None: self.vectors[emb_id] = embedding self.payloads[emb_id] = payload def _get(self, emb_id: str) -> Optional[E]: return self.vectors.get(emb_id) def delete(self, emb_id: str) -> None: if emb_id in self.vectors: del self.vectors[emb_id] del self.payloads[emb_id] # remove from cache self._cache = {k: v for k, v in self._cache.items() if emb_id not in k} def clear(self) -> None: self.vectors.clear() self._cache.clear() self.payloads.clear() def __len__(self): return len(self.vectors) def _retrieve_similar( self, ref_id: str, top_k: Optional[int] = None, min_similarity: Optional[float] = None ) -> List[Tuple[str, T, float]]: ref_embedding = self.get(emb_id=ref_id) if ref_embedding is None: raise ValueError(f"Reference embedding '{ref_id}' not found.") # calculate similarity to all embeddings similarities = {} for emb_id, embedding in self.vectors.items(): if (emb_id, ref_id) not in self._cache: # use cosine similarity self._cache[(emb_id, ref_id)] = self._sim(ref_embedding, embedding) similarities[emb_id] = self._cache[(emb_id, ref_id)] # sort by similarity similar_entries = sorted(similarities.items(), key=lambda x: x[1], reverse=True) if min_similarity is not None: similar_entries = [ (emb_id, sim) for emb_id, sim in similar_entries if sim >= min_similarity ] if top_k is not None: similar_entries = similar_entries[:top_k] return [(emb_id, self.payloads[emb_id], sim) for emb_id, sim in similar_entries] def _save_to_directory(self, directory: str) -> None: indices = list(self.vectors.keys()) with open(os.path.join(directory, self.INDEX_FILE), "w") as f: json.dump(indices, f) embeddings_np = np.array(list(self.vectors.values())) np.save(os.path.join(directory, self.EMBEDDINGS_FILE), embeddings_np) payloads = [self.payloads[idx] for idx in indices] with open(os.path.join(directory, self.PAYLOADS_FILE), "w") as f: json.dump(payloads, f) def as_indices_vectors_payloads(self) -> Tuple[List[str], np.ndarray, List[T]]: indices = list(self.vectors.keys()) embeddings_np = np.array(list(self.vectors.values())) payloads = [self.payloads[idx] for idx in indices] return indices, embeddings_np, payloads class QdrantVectorStore(VectorStore[T, List[float]]): """Vector store using Qdrant as a backend.""" COLLECTION_NAME = "ADUs" MAX_LIMIT = 100 def __init__( self, location: str = ":memory:", vector_size: int = 768, distance: Distance = Distance.COSINE, ): self.client = QdrantClient(location=location) self.emb_id2point_id = {} self.point_id2emb_id = {} self.client.create_collection( collection_name=self.COLLECTION_NAME, vectors_config=VectorParams(size=vector_size, distance=distance), ) def __len__(self): return self.client.get_collection(collection_name=self.COLLECTION_NAME).points_count def _add(self, emb_id: str, payload: T, embedding: List[float]) -> None: if emb_id in self.emb_id2point_id: # update existing entry point_id = self.emb_id2point_id[emb_id] else: # we use the length of the emb_id2point_id dict as the index, # because we assume that, even when we delete an entry from # the store, we do not delete it from the index point_id = len(self.emb_id2point_id) self.emb_id2point_id[emb_id] = point_id self.point_id2emb_id[point_id] = emb_id self.client.upsert( collection_name=self.COLLECTION_NAME, points=[PointStruct(id=point_id, vector=embedding, payload=payload)], ) def _get(self, emb_id: str) -> Optional[List[float]]: if emb_id not in self.emb_id2point_id: return None points = self.client.retrieve( collection_name=self.COLLECTION_NAME, ids=[self.emb_id2point_id[emb_id]], with_vectors=True, ) if len(points) == 0: return None elif len(points) == 1: return points[0].vector else: raise ValueError(f"Multiple points found for ID '{emb_id}'.") def _retrieve_similar( self, ref_id: str, top_k: Optional[int] = None, min_similarity: Optional[float] = None ) -> List[Tuple[str, T, float]]: similar_entries = self.client.recommend( collection_name=self.COLLECTION_NAME, positive=[self.emb_id2point_id[ref_id]], limit=top_k or self.MAX_LIMIT, score_threshold=min_similarity, ) return [ (self.point_id2emb_id[entry.id], entry.payload, entry.score) for entry in similar_entries ] def clear(self) -> None: vectors_config = self.client.get_collection( collection_name=self.COLLECTION_NAME ).vectors_config self.client.delete_collection(collection_name=self.COLLECTION_NAME) self.client.create_collection( collection_name=self.COLLECTION_NAME, vectors_config=vectors_config, ) self.emb_id2point_id.clear() self.point_id2emb_id.clear() def as_indices_vectors_payloads(self) -> Tuple[List[str], np.ndarray, List[T]]: num_entries = self.client.get_collection(collection_name=self.COLLECTION_NAME).points_count data, point_ids = self.client.scroll( collection_name=self.COLLECTION_NAME, with_vectors=True, limit=num_entries ) vectors_np = np.array([point.vector for point in data]) payloads = [point.payload for point in data] emb_ids = [self.point_id2emb_id[point.id] for point in data] return emb_ids, vectors_np, payloads