from typing import List, Dict, Optional import chromadb import numpy as np from embeddings import EmbeddingManager, MatchResult from encoder import create_encoders, FIELD_MAPPING class ChromaMatchingSystem: def __init__(self, collection_name: str = "job_seekers"): # Initialize ChromaDB client self.client = chromadb.Client() # Initialize existing embedding system job_encoder, seeker_encoder = create_encoders('all-mpnet-base-v2') self.embedding_manager = EmbeddingManager(job_encoder, seeker_encoder) # Create or get collection for each field type based on FIELD_MAPPING self.collections = {} # Extract unique fields from job posting side of FIELD_MAPPING job_fields = set(FIELD_MAPPING.keys()) for field in job_fields: self.collections[field] = self.client.get_or_create_collection( name=f"{collection_name}_{field}", embedding_function=None # Explicitly set to None since we handle embeddings ) def add_job_seeker(self, jobseeker_id: str, processed_seeker, unprocessed_seeker, metadata: Optional[Dict] = None): """Add a job seeker to ChromaDB collections""" # Get embeddings using existing system field_embeddings = self.embedding_manager.embed_jobseeker(processed_seeker, unprocessed_seeker) # Map seeker fields to job posting fields using FIELD_MAPPING for job_field, seeker_field in FIELD_MAPPING.items(): if seeker_field in field_embeddings: # Ensure metadata is a dictionary safe_metadata = metadata if metadata is not None else {} # Add to collection self.collections[job_field].add( embeddings=[field_embeddings[seeker_field].tolist()], metadatas=[safe_metadata], ids=[jobseeker_id], documents=[jobseeker_id] # Adding document is required ) def get_matches(self, job_posting, n_results: int = 10, where_conditions: Optional[Dict] = None) -> List[MatchResult]: """Get matches using your existing similarity calculation with ChromaDB storage""" # Get job posting embeddings using your existing system job_embeddings = self.embedding_manager.embed_jobposting(job_posting) matches = [] # Query each field collection field_results = {} # Query collections and store results for job_field in FIELD_MAPPING.keys(): if job_field in job_embeddings: try: results = self.collections[job_field].query( query_embeddings=[job_embeddings[job_field].tolist()], n_results=n_results, where=where_conditions, include=["embeddings", "metadatas", "distances", "documents"] ) if results and 'embeddings' in results and results['embeddings']: field_results[job_field] = results except Exception as e: print(f"Error querying {job_field}: {str(e)}") continue # Get unique jobseeker IDs from results jobseeker_ids = set() for results in field_results.values(): if 'ids' in results and results['ids']: jobseeker_ids.update(results['ids'][0]) # Calculate similarity scores using existing system for jobseeker_id in jobseeker_ids: # Reconstruct seeker embeddings from ChromaDB results seeker_embeddings = {} for job_field, seeker_field in FIELD_MAPPING.items(): if job_field in field_results: results = field_results[job_field] if ('ids' in results and results['ids'] and 'embeddings' in results and results['embeddings']): if jobseeker_id in results['ids'][0]: idx = results['ids'][0].index(jobseeker_id) if idx < len(results['embeddings'][0]): embedding = results['embeddings'][0][idx] seeker_embeddings[seeker_field] = np.array(embedding) # Only calculate similarity if we have embeddings if seeker_embeddings: # Use your existing similarity calculation match_result = self.embedding_manager.calculate_similarity( job_embeddings, seeker_embeddings ) matches.append(match_result) # Sort by similarity score matches.sort(key=lambda x: x.similarity_score, reverse=True) return matches[:n_results]