|
from typing import List, Dict, Optional |
|
from opensearch_client import OpenSearchClient |
|
from chroma_storage import ChromaMatchingSystem |
|
from embeddings import JobPosting |
|
|
|
class TwoPhaseSearchSystem: |
|
def __init__(self, chroma_matcher: ChromaMatchingSystem, opensearch_client: OpenSearchClient): |
|
self.chroma_matcher = chroma_matcher |
|
self.opensearch_client = opensearch_client |
|
|
|
def search_candidates(self, |
|
job_posting: JobPosting, |
|
search_params: Dict, |
|
n_results: int = 10) -> List[Dict]: |
|
""" |
|
Two-phase search: |
|
1. OpenSearch boolean filtering |
|
2. ChromaDB embedding matching |
|
""" |
|
|
|
opensearch_results = self.opensearch_client.search_jobseekers(search_params) |
|
|
|
if not opensearch_results: |
|
return [] |
|
|
|
|
|
|
|
matches = self.chroma_matcher.get_matches( |
|
job_posting=job_posting, |
|
n_results=n_results, |
|
where_conditions={"jobseeker_id": {"$in": [r['jobseeker_id'] for r in opensearch_results]}} |
|
) |
|
|
|
|
|
final_results = [] |
|
for match in matches: |
|
|
|
opensearch_result = next( |
|
(r for r in opensearch_results if r['jobseeker_id'] == match.jobseeker_id), |
|
None |
|
) |
|
if opensearch_result: |
|
final_results.append({ |
|
'jobseeker_id': match.jobseeker_id, |
|
'similarity_score': match.similarity_score, |
|
'field_scores': match.field_scores, |
|
'explanation': match.explanation, |
|
'opensearch_score': opensearch_result.get('_score', 0.0) |
|
}) |
|
|
|
return final_results |
|
|
|
def build_search_params(self, |
|
job_posting: JobPosting, |
|
location: Optional[Dict] = None, |
|
certifications: Optional[Dict] = None, |
|
tags: Optional[List[Dict]] = None, |
|
minimum_skills_match: int = 3, |
|
minimum_results: int = 10) -> Dict: |
|
""" |
|
Build OpenSearch query parameters with fallback logic |
|
""" |
|
|
|
params_list = self._generate_param_variations( |
|
job_posting=job_posting, |
|
location=location, |
|
certifications=certifications, |
|
tags=tags, |
|
minimum_skills_match=minimum_skills_match |
|
) |
|
|
|
|
|
for params in params_list: |
|
results = self.opensearch_client.search_jobseekers(params) |
|
if results and len(results) >= minimum_results: |
|
return params |
|
|
|
|
|
return { |
|
"skills": [skill.skill_name for skill in job_posting.primary_skills], |
|
"minimum_skills_should_match": 1, |
|
"size": 100, |
|
"sort_by": ["score"] |
|
} |
|
|
|
def _generate_param_variations(self, |
|
job_posting: JobPosting, |
|
location: Optional[Dict] = None, |
|
certifications: Optional[Dict] = None, |
|
tags: Optional[List[Dict]] = None, |
|
minimum_skills_match: int = 3) -> List[Dict]: |
|
"""Generate variations of search parameters from strict to relaxed""" |
|
primary_skills = [skill.skill_name for skill in job_posting.primary_skills] |
|
|
|
|
|
strict_params = { |
|
"boolean_search_query": job_posting.title, |
|
"skills": primary_skills, |
|
"minimum_skills_should_match": minimum_skills_match, |
|
"size": 100, |
|
"sort_by": ["score"] |
|
} |
|
|
|
|
|
if location: |
|
strict_params.update({ |
|
"country_filter": location.get("country"), |
|
"state_filter": location.get("state") |
|
}) |
|
|
|
if certifications: |
|
strict_params.update({ |
|
"certifications_name": certifications.get("name"), |
|
"certifications_organization": certifications.get("organization") |
|
}) |
|
|
|
if tags: |
|
strict_params["tags"] = tags |
|
|
|
|
|
variations = [ |
|
strict_params, |
|
{**strict_params, "minimum_skills_should_match": 2}, |
|
{k: v for k, v in strict_params.items() |
|
if k not in ["certifications_name", "certifications_organization"]}, |
|
{k: v for k, v in strict_params.items() |
|
if k not in ["state_filter", "country_filter"]}, |
|
{k: v for k, v in strict_params.items() |
|
if k not in ["tags"]}, |
|
{ |
|
"skills": primary_skills, |
|
"minimum_skills_should_match": 1, |
|
"size": 100, |
|
"sort_by": ["score"] |
|
} |
|
] |
|
|
|
return variations |