from sentence_transformers import SentenceTransformer from transformers import AutoModel, AutoTokenizer import torch import numpy as np from typing import Dict, List, Union, Optional from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime import json @dataclass class Skill: skill_name: str @dataclass class JobPosting: # Essential matching fields (from API) title: str role_description: str company_description: str primary_skills: List[Skill] secondary_skills: List[Skill] # Additional API fields (with defaults) job_posting_id: str = "test_id" status: str = "active" location: str = "Test Location" workplace_model: str = "hybrid" job_engagement: str = "contract-to-hire" min_years_of_experience: int = 0 max_years_of_experience: int = 0 project_duration_from: datetime = datetime.now() project_duration_to: datetime = datetime.now() hourly_bill_rate_min: float = 50.0 hourly_bill_rate_max: float = 100.0 annual_salary_min: float = 100000.0 annual_salary_max: float = 150000.0 day_to_day_job_responsibilities: str = "" reason_for_hire: str = "" application_of_skills: str = "" company_id: str = "test_company" @dataclass class IndependentJobSeekerAssessmentRDS: # Essential matching fields (from API) primary_skills: List[str] secondary_skills: List[str] experiences: List[dict] educations: List[dict] certifications: List[dict] @dataclass class JobseekerInfoRDS: summary: str class BaseFieldEncoder(ABC): """Base class for field-specific encoding""" def __init__(self, model_name: str = 'all-mpnet-base-v2'): self.model = SentenceTransformer(model_name) self.embedding_dim = self.model.get_sentence_embedding_dimension() # Device handling if torch.cuda.is_available(): self.device = torch.device('cuda') elif torch.backends.mps.is_available(): self.device = torch.device('mps') else: self.device = torch.device('cpu') self.model.to(self.device) @abstractmethod def encode_fields(self, fields: Dict[str, str]) -> Dict[str, np.ndarray]: """Encode each field separately""" pass def encode_field(self, text: str) -> np.ndarray: """Encode a single field""" if not text: return np.zeros(self.embedding_dim) return self.model.encode(text, convert_to_numpy=True) class JobPostingEncoder(BaseFieldEncoder): """Encoder specifically for job postings""" def __init__(self, model_name: str = 'all-mpnet-base-v2'): super().__init__(model_name) self.required_fields = { 'title', 'role_description', 'company_description', 'primary_skills', 'secondary_skills' } def encode_fields(self, fields: Dict[str, str]) -> Dict[str, np.ndarray]: """Encode all job posting fields""" # Validate required fields missing_fields = self.required_fields - set(fields.keys()) if missing_fields: raise ValueError(f"Missing required fields: {missing_fields}") field_embeddings = {} # Encode each field separately for field_name, content in fields.items(): if field_name in self.required_fields: field_embeddings[field_name] = self.encode_field(content) return field_embeddings class JobSeekerEncoder(BaseFieldEncoder): """Encoder specifically for job seekers""" def __init__(self, model_name: str = 'all-mpnet-base-v2'): super().__init__(model_name) self.required_fields = { 'summary', 'experience', 'primary_skills', 'secondary_skills', 'certifications', 'education' # Add this line } def encode_fields(self, fields: Dict[str, str]) -> Dict[str, np.ndarray]: """Encode all job seeker fields""" # Validate required fields missing_fields = self.required_fields - set(fields.keys()) if missing_fields: raise ValueError(f"Missing required fields: {missing_fields}") field_embeddings = {} # Encode each field separately for field_name, content in fields.items(): if field_name in self.required_fields: field_embeddings[field_name] = self.encode_field(content) return field_embeddings class LegacyFieldEncoder: """Legacy encoder that uses the original Qwen2 approach""" def __init__(self, model_path: str = "/Users/sebastian_a/jobposting-embedding"): # Initialize with local Qwen2 model self.model = AutoModel.from_pretrained(model_path) self.tokenizer = AutoTokenizer.from_pretrained(model_path) if torch.cuda.is_available(): self.device = torch.device('cuda') elif torch.backends.mps.is_available(): self.device = torch.device('mps') else: self.device = torch.device('cpu') self.model.to(self.device) self.model.eval() def _get_embedding(self, text: str) -> np.ndarray: """Helper to get embeddings with proper handling""" with torch.no_grad(): # Tokenize with attention masks encoding = self.tokenizer( text, return_tensors='pt', padding=True, truncation=True, max_length=512, return_attention_mask=True ) # Move to correct device input_ids = encoding['input_ids'].to(self.device) attention_mask = encoding['attention_mask'].to(self.device) # Get model output outputs = self.model( input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True # Get all hidden states ) # Get last hidden state last_hidden_state = outputs.last_hidden_state # Apply attention mask and mean pooling # This is better than just taking CLS token mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float() sum_embeddings = torch.sum(last_hidden_state * mask_expanded, 1) sum_mask = torch.clamp(mask_expanded.sum(1), min=1e-9) embedding = (sum_embeddings / sum_mask).squeeze() # Convert to numpy and normalize embedding = embedding.cpu().numpy() embedding = embedding / (np.linalg.norm(embedding) + 1e-9) return embedding def encode_jobposting(self, job_posting: JobPosting) -> np.ndarray: """Legacy job posting encoding using Qwen2""" # Convert skills lists to strings primary_skills_str = ', '.join(skill.skill_name for skill in job_posting.primary_skills) if job_posting.primary_skills else 'None' secondary_skills_str = ', '.join(skill.skill_name for skill in job_posting.secondary_skills) if job_posting.secondary_skills else 'None' # Concatenate all fields into one string, maintaining legacy format text = f""" Title: {job_posting.title} About the Role: {job_posting.role_description} Company Description: {job_posting.company_description} Primary Skills Required: {primary_skills_str} Secondary Skills Preferred: {secondary_skills_str} """ return self._get_embedding(text) def encode_jobseeker(self, processed_jobseeker: IndependentJobSeekerAssessmentRDS, unprocessed_jobseeker: JobseekerInfoRDS) -> np.ndarray: """Legacy job seeker encoding using Qwen2""" # Create a single string with all relevant information text = f""" Summary: {unprocessed_jobseeker.summary} Skills: {', '.join(processed_jobseeker.skills)} Experience: {json.dumps(processed_jobseeker.experiences, indent=2)} Education: {json.dumps(processed_jobseeker.educations, indent=2)} Certifications: {json.dumps(processed_jobseeker.certifications, indent=2)} """ return self._get_embedding(text) def encode_fields(self, fields: Dict[str, str]) -> Dict[str, np.ndarray]: """Not used in legacy approach""" pass def create_encoders(model_name: str = 'all-mpnet-base-v2') -> tuple: """Create both job posting and seeker encoders using the same base model""" print(f"Creating encoders using {model_name}...") job_encoder = JobPostingEncoder(model_name) seeker_encoder = JobSeekerEncoder(model_name) print(f"Created encoders with embedding dimension: {job_encoder.embedding_dim}") return job_encoder, seeker_encoder FIELD_MAPPING = { 'title': ['summary'], # Job title maps to seeker summary 'primary_skills': ['primary_skills'], # Primary skills to primary skills 'secondary_skills': ['secondary_skills'], # Secondary skills to secondary skills 'role_description': ['experience', 'certifications'] # Role maps to both experience and certs } FIELD_WEIGHTS = { 'primary_skills_primary_skills': 0.5, # Increased - exact skill matches are critical 'secondary_skills_secondary_skills': 0.1, # Decreased - nice to have but less critical 'role_description_experience': 0.25, 'role_description_certifications': 0.05, 'title_summary': 0.1 } __all__ = ['JobPostingEncoder', 'JobSeekerEncoder', 'LegacyFieldEncoder', 'create_encoders', 'FIELD_MAPPING', 'FIELD_WEIGHTS']