embeddings-sebastian / embeddings.py
sebastianalgharaballi's picture
Update embeddings.py
c9c3268 verified
raw
history blame
9.74 kB
from typing import Dict, List, Optional, Tuple, Set
import numpy as np
from dataclasses import dataclass
from datetime import datetime
from encoder import FIELD_MAPPING, FIELD_WEIGHTS
@dataclass
class Skill:
skill_name: str
@dataclass
class JobPosting:
# Essential matching fields (from API)
title: str
role_description: str
company_description: str
primary_skills: List[Skill]
secondary_skills: List[Skill]
# Additional API fields (with defaults)
job_posting_id: str = "test_id"
status: str = "active"
location: str = "Test Location"
workplace_model: str = "hybrid"
job_engagement: str = "contract-to-hire"
min_years_of_experience: int = 0
max_years_of_experience: int = 0
project_duration_from: datetime = datetime.now()
project_duration_to: datetime = datetime.now()
hourly_bill_rate_min: float = 50.0
hourly_bill_rate_max: float = 100.0
annual_salary_min: float = 100000.0
annual_salary_max: float = 150000.0
day_to_day_job_responsibilities: str = ""
reason_for_hire: str = ""
application_of_skills: str = ""
company_id: str = "test_company"
@dataclass
class IndependentJobSeekerAssessmentRDS:
# Essential matching fields (from API)
primary_skills: List[str]
secondary_skills: List[str]
experiences: List[dict]
educations: List[dict]
certifications: List[dict]
@dataclass
class JobseekerInfoRDS:
summary: str
@dataclass
class MatchResult:
"""Stores the result of a job-seeker match with explanation"""
similarity_score: float
field_scores: Dict[str, float]
explanation: str
status: str = "unseen"
class EmbeddingManager:
def __init__(self, job_encoder, seeker_encoder):
self.job_encoder = job_encoder
self.seeker_encoder = seeker_encoder
def get_job_fields(self, job_posting: JobPosting) -> Dict[str, str]:
"""Extract relevant fields from job posting"""
# Convert primary skills list to string
primary_skills_str = ', '.join([skill.skill_name for skill in job_posting.primary_skills]) if job_posting.primary_skills else ''
# Convert secondary skills list to string
secondary_skills_str = ', '.join([skill.skill_name for skill in job_posting.secondary_skills]) if job_posting.secondary_skills else ''
return {
'title': job_posting.title,
'role_description': job_posting.role_description,
'company_description': job_posting.company_description,
'primary_skills': primary_skills_str,
'secondary_skills': secondary_skills_str
}
def get_seeker_fields(self, processed_seeker: IndependentJobSeekerAssessmentRDS,
unprocessed_seeker: JobseekerInfoRDS) -> Dict[str, str]:
"""Extract relevant fields from job seeker"""
return {
'primary_skills': ', '.join(processed_seeker.primary_skills),
'secondary_skills': ', '.join(processed_seeker.secondary_skills),
'experience': self._format_experience(processed_seeker.experiences),
'education': self._format_education(processed_seeker.educations),
'certifications': self._format_certifications(processed_seeker.certifications),
'summary': unprocessed_seeker.summary
}
def _format_experience(self, experiences: List[dict]) -> str:
exp_parts = []
for exp in experiences:
summaries = exp.get('experience_summaries', [])
exp_str = ' '.join(summaries)
exp_parts.append(exp_str)
return ' | '.join(exp_parts)
def _format_education(self, educations: List[dict]) -> str:
"""Format education entries into a single string"""
edu_parts = []
for edu in educations:
degree = edu.get('degree', '')
field = edu.get('field', '')
institution = edu.get('institution', '')
edu_str = f"{degree} in {field} from {institution}"
edu_parts.append(edu_str)
return ' | '.join(edu_parts)
def _format_certifications(self, certifications: List[dict]) -> str:
"""Format certification entries into a single string"""
cert_parts = []
for cert in certifications:
name = cert.get('name', '') # This is required as per schema
org = cert.get('organization', '')
start = cert.get('start_date', '')
end = cert.get('end_date', '')
# Build certification string
cert_str = name
if org:
cert_str += f" from {org}"
if start or end:
date_range = []
if start:
date_range.append(start)
if end:
date_range.append(end)
cert_str += f" ({' - '.join(date_range)})"
cert_parts.append(cert_str)
return ' | '.join(cert_parts)
def embed_jobposting(self, job_posting: JobPosting) -> Dict[str, np.ndarray]:
"""Generate embeddings for job posting fields"""
fields = self.get_job_fields(job_posting)
return self.job_encoder.encode_fields(fields)
def embed_jobseeker(self, processed_seeker: IndependentJobSeekerAssessmentRDS,
unprocessed_seeker: JobseekerInfoRDS) -> Dict[str, np.ndarray]:
"""Generate embeddings for job seeker fields"""
fields = self.get_seeker_fields(processed_seeker, unprocessed_seeker)
print("DEBUG - Seeker fields:", fields)
return self.seeker_encoder.encode_fields(fields)
# list of job seeker ids with their scores (from metadata)
def calculate_similarity(self, job_embeddings: Dict[str, np.ndarray],
seeker_embeddings: Dict[str, np.ndarray]) -> MatchResult:
"""Calculate similarity with strict thresholds"""
field_scores = {}
explanation_parts = []
# Calculate similarity for each field pair
for job_field, seeker_fields in FIELD_MAPPING.items():
if job_field not in job_embeddings:
continue
job_emb = job_embeddings[job_field]
# Handle multiple seeker fields for one job field
for seeker_field in seeker_fields:
if seeker_field not in seeker_embeddings:
continue
seeker_emb = seeker_embeddings[seeker_field]
# Calculate raw cosine similarity
similarity = np.dot(job_emb, seeker_emb) / (
np.linalg.norm(job_emb) * np.linalg.norm(seeker_emb) + 1e-9
)
# Scale to [0, 1] much more aggressively
raw_score = (similarity * 0.8) # Compress range
field_score = max(0, min(1, (raw_score + 1) / 2)) # Rescale to [0,1]
# Apply non-linear transformation for more discrimination
if field_score > 0.9: # Only the very best get boosted
field_score = min(field_score * 1.1, 1.0)
elif field_score < 0.7: # More aggressive penalty for lower scores
field_score = field_score * 0.6
field_pair_name = f"{job_field}_{seeker_field}"
field_scores[field_pair_name] = field_score
# Much stricter thresholds for quality descriptions
match_quality = "strong" if field_score > 0.9 else \
"good" if field_score > 0.8 else \
"moderate" if field_score > 0.6 else "weak"
explanation_parts.append(
f"{match_quality.capitalize()} match on {job_field} to {seeker_field} "
f"(similarity: {field_score:.2f})"
)
# Calculate weighted average with critical field emphasis
final_score = 0.0
total_weight = 0.0
# Critical fields get extra weight (subject to change)
critical_fields = {
'primary_skills_primary_skills': 1,
'role_description_experience': 1,
'role_description_certifications': 1, # Added certifications with same weight
}
for field_pair, score in field_scores.items():
base_weight = FIELD_WEIGHTS.get(field_pair, 0.0)
# Apply critical field multiplier
weight = base_weight * critical_fields.get(field_pair, 1.0)
final_score += score * weight
total_weight += weight
if total_weight > 0:
final_score = final_score / total_weight
# Final adjustments for extreme discrimination
if final_score > 0.9: # Only truly exceptional matches
final_score = min(final_score * 1.1, 1.0)
elif final_score < 0.7: # Really penalize poor matches
final_score = final_score * 0.6
explanation = " | ".join(explanation_parts)
print("DEBUG - All field scores:", field_scores)
return MatchResult(
similarity_score=final_score,
field_scores=field_scores,
explanation=explanation
)
def initialize_embedding_system(job_encoder, seeker_encoder):
"""Initialize the embedding system"""
return EmbeddingManager(job_encoder, seeker_encoder)
__all__ = ['EmbeddingManager', 'MatchResult', 'initialize_embedding_system']