from typing import Dict, List, Optional, Tuple, Set import numpy as np from dataclasses import dataclass from datetime import datetime from encoder import FIELD_MAPPING, FIELD_WEIGHTS @dataclass class Skill: skill_name: str @dataclass class JobPosting: # Essential matching fields (from API) title: str role_description: str company_description: str primary_skills: List[Skill] secondary_skills: List[Skill] # Additional API fields (with defaults) job_posting_id: str = "test_id" status: str = "active" location: str = "Test Location" workplace_model: str = "hybrid" job_engagement: str = "contract-to-hire" min_years_of_experience: int = 0 max_years_of_experience: int = 0 project_duration_from: datetime = datetime.now() project_duration_to: datetime = datetime.now() hourly_bill_rate_min: float = 50.0 hourly_bill_rate_max: float = 100.0 annual_salary_min: float = 100000.0 annual_salary_max: float = 150000.0 day_to_day_job_responsibilities: str = "" reason_for_hire: str = "" application_of_skills: str = "" company_id: str = "test_company" @dataclass class IndependentJobSeekerAssessmentRDS: # Essential matching fields (from API) primary_skills: List[str] secondary_skills: List[str] experiences: List[dict] educations: List[dict] certifications: List[dict] @dataclass class JobseekerInfoRDS: summary: str @dataclass class MatchResult: """Stores the result of a job-seeker match with explanation""" similarity_score: float field_scores: Dict[str, float] explanation: str status: str = "unseen" class EmbeddingManager: def __init__(self, job_encoder, seeker_encoder): self.job_encoder = job_encoder self.seeker_encoder = seeker_encoder def get_job_fields(self, job_posting: JobPosting) -> Dict[str, str]: """Extract relevant fields from job posting""" # Convert primary skills list to string primary_skills_str = ', '.join([skill.skill_name for skill in job_posting.primary_skills]) if job_posting.primary_skills else '' # Convert secondary skills list to string secondary_skills_str = ', '.join([skill.skill_name for skill in job_posting.secondary_skills]) if job_posting.secondary_skills else '' return { 'title': job_posting.title, 'role_description': job_posting.role_description, 'company_description': job_posting.company_description, 'primary_skills': primary_skills_str, 'secondary_skills': secondary_skills_str } def get_seeker_fields(self, processed_seeker: IndependentJobSeekerAssessmentRDS, unprocessed_seeker: JobseekerInfoRDS) -> Dict[str, str]: """Extract relevant fields from job seeker""" return { 'primary_skills': ', '.join(processed_seeker.primary_skills), 'secondary_skills': ', '.join(processed_seeker.secondary_skills), 'experience': self._format_experience(processed_seeker.experiences), 'education': self._format_education(processed_seeker.educations), 'certifications': self._format_certifications(processed_seeker.certifications), 'summary': unprocessed_seeker.summary } def _format_experience(self, experiences: List[dict]) -> str: exp_parts = [] for exp in experiences: summaries = exp.get('experience_summaries', []) exp_str = ' '.join(summaries) exp_parts.append(exp_str) return ' | '.join(exp_parts) def _format_education(self, educations: List[dict]) -> str: """Format education entries into a single string""" edu_parts = [] for edu in educations: degree = edu.get('degree', '') field = edu.get('field', '') institution = edu.get('institution', '') edu_str = f"{degree} in {field} from {institution}" edu_parts.append(edu_str) return ' | '.join(edu_parts) def _format_certifications(self, certifications: List[dict]) -> str: """Format certification entries into a single string""" cert_parts = [] for cert in certifications: name = cert.get('name', '') # This is required as per schema org = cert.get('organization', '') start = cert.get('start_date', '') end = cert.get('end_date', '') # Build certification string cert_str = name if org: cert_str += f" from {org}" if start or end: date_range = [] if start: date_range.append(start) if end: date_range.append(end) cert_str += f" ({' - '.join(date_range)})" cert_parts.append(cert_str) return ' | '.join(cert_parts) def embed_jobposting(self, job_posting: JobPosting) -> Dict[str, np.ndarray]: """Generate embeddings for job posting fields""" fields = self.get_job_fields(job_posting) return self.job_encoder.encode_fields(fields) def embed_jobseeker(self, processed_seeker: IndependentJobSeekerAssessmentRDS, unprocessed_seeker: JobseekerInfoRDS) -> Dict[str, np.ndarray]: """Generate embeddings for job seeker fields""" fields = self.get_seeker_fields(processed_seeker, unprocessed_seeker) print("DEBUG - Seeker fields:", fields) return self.seeker_encoder.encode_fields(fields) # list of job seeker ids with their scores (from metadata) def calculate_similarity(self, job_embeddings: Dict[str, np.ndarray], seeker_embeddings: Dict[str, np.ndarray]) -> MatchResult: """Calculate similarity with strict thresholds""" field_scores = {} explanation_parts = [] # Calculate similarity for each field pair for job_field, seeker_fields in FIELD_MAPPING.items(): if job_field not in job_embeddings: continue job_emb = job_embeddings[job_field] # Handle multiple seeker fields for one job field for seeker_field in seeker_fields: if seeker_field not in seeker_embeddings: continue seeker_emb = seeker_embeddings[seeker_field] # Calculate raw cosine similarity similarity = np.dot(job_emb, seeker_emb) / ( np.linalg.norm(job_emb) * np.linalg.norm(seeker_emb) + 1e-9 ) # Scale to [0, 1] much more aggressively raw_score = (similarity * 0.8) # Compress range field_score = max(0, min(1, (raw_score + 1) / 2)) # Rescale to [0,1] # Apply non-linear transformation for more discrimination if field_score > 0.9: # Only the very best get boosted field_score = min(field_score * 1.1, 1.0) elif field_score < 0.7: # More aggressive penalty for lower scores field_score = field_score * 0.6 field_pair_name = f"{job_field}_{seeker_field}" field_scores[field_pair_name] = field_score # Much stricter thresholds for quality descriptions match_quality = "strong" if field_score > 0.9 else \ "good" if field_score > 0.8 else \ "moderate" if field_score > 0.6 else "weak" explanation_parts.append( f"{match_quality.capitalize()} match on {job_field} to {seeker_field} " f"(similarity: {field_score:.2f})" ) # Calculate weighted average with critical field emphasis final_score = 0.0 total_weight = 0.0 # Critical fields get extra weight (subject to change) critical_fields = { 'primary_skills_primary_skills': 1, 'role_description_experience': 1, 'role_description_certifications': 1, # Added certifications with same weight } for field_pair, score in field_scores.items(): base_weight = FIELD_WEIGHTS.get(field_pair, 0.0) # Apply critical field multiplier weight = base_weight * critical_fields.get(field_pair, 1.0) final_score += score * weight total_weight += weight if total_weight > 0: final_score = final_score / total_weight # Final adjustments for extreme discrimination if final_score > 0.9: # Only truly exceptional matches final_score = min(final_score * 1.1, 1.0) elif final_score < 0.7: # Really penalize poor matches final_score = final_score * 0.6 explanation = " | ".join(explanation_parts) print("DEBUG - All field scores:", field_scores) return MatchResult( similarity_score=final_score, field_scores=field_scores, explanation=explanation ) def initialize_embedding_system(job_encoder, seeker_encoder): """Initialize the embedding system""" return EmbeddingManager(job_encoder, seeker_encoder) __all__ = ['EmbeddingManager', 'MatchResult', 'initialize_embedding_system']