embeddings-sebastian / encoder.py
sebastianalgharaballi's picture
Update encoder.py
29e2c27 verified
raw
history blame
9.96 kB
from sentence_transformers import SentenceTransformer
from transformers import AutoModel, AutoTokenizer
import torch
import numpy as np
from typing import Dict, List, Union, Optional
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class Skill:
skill_name: str
@dataclass
class JobPosting:
# Essential matching fields (from API)
title: str
role_description: str
company_description: str
primary_skills: List[Skill]
secondary_skills: List[Skill]
# Additional API fields (with defaults)
job_posting_id: str = "test_id"
status: str = "active"
location: str = "Test Location"
workplace_model: str = "hybrid"
job_engagement: str = "contract-to-hire"
min_years_of_experience: int = 0
max_years_of_experience: int = 0
project_duration_from: datetime = datetime.now()
project_duration_to: datetime = datetime.now()
hourly_bill_rate_min: float = 50.0
hourly_bill_rate_max: float = 100.0
annual_salary_min: float = 100000.0
annual_salary_max: float = 150000.0
day_to_day_job_responsibilities: str = ""
reason_for_hire: str = ""
application_of_skills: str = ""
company_id: str = "test_company"
@dataclass
class IndependentJobSeekerAssessmentRDS:
# Essential matching fields (from API)
primary_skills: List[str]
secondary_skills: List[str]
experiences: List[dict]
educations: List[dict]
certifications: List[dict]
@dataclass
class JobseekerInfoRDS:
summary: str
class BaseFieldEncoder(ABC):
"""Base class for field-specific encoding"""
def __init__(self, model_name: str = 'all-mpnet-base-v2'):
self.model = SentenceTransformer(model_name)
self.embedding_dim = self.model.get_sentence_embedding_dimension()
# Device handling
if torch.cuda.is_available():
self.device = torch.device('cuda')
elif torch.backends.mps.is_available():
self.device = torch.device('mps')
else:
self.device = torch.device('cpu')
self.model.to(self.device)
@abstractmethod
def encode_fields(self, fields: Dict[str, str]) -> Dict[str, np.ndarray]:
"""Encode each field separately"""
pass
def encode_field(self, text: str) -> np.ndarray:
"""Encode a single field"""
if not text:
return np.zeros(self.embedding_dim)
return self.model.encode(text, convert_to_numpy=True)
class JobPostingEncoder(BaseFieldEncoder):
"""Encoder specifically for job postings"""
def __init__(self, model_name: str = 'all-mpnet-base-v2'):
super().__init__(model_name)
self.required_fields = {
'title',
'role_description',
'company_description',
'primary_skills',
'secondary_skills'
}
def encode_fields(self, fields: Dict[str, str]) -> Dict[str, np.ndarray]:
"""Encode all job posting fields"""
# Validate required fields
missing_fields = self.required_fields - set(fields.keys())
if missing_fields:
raise ValueError(f"Missing required fields: {missing_fields}")
field_embeddings = {}
# Encode each field separately
for field_name, content in fields.items():
if field_name in self.required_fields:
field_embeddings[field_name] = self.encode_field(content)
return field_embeddings
class JobSeekerEncoder(BaseFieldEncoder):
"""Encoder specifically for job seekers"""
def __init__(self, model_name: str = 'all-mpnet-base-v2'):
super().__init__(model_name)
self.required_fields = {
'summary',
'experience',
'primary_skills',
'secondary_skills',
'certifications',
'education' # Add this line
}
def encode_fields(self, fields: Dict[str, str]) -> Dict[str, np.ndarray]:
"""Encode all job seeker fields"""
# Validate required fields
missing_fields = self.required_fields - set(fields.keys())
if missing_fields:
raise ValueError(f"Missing required fields: {missing_fields}")
field_embeddings = {}
# Encode each field separately
for field_name, content in fields.items():
if field_name in self.required_fields:
field_embeddings[field_name] = self.encode_field(content)
return field_embeddings
class LegacyFieldEncoder:
"""Legacy encoder that uses the original Qwen2 approach"""
def __init__(self, model_path: str = "/Users/sebastian_a/jobposting-embedding"):
# Initialize with local Qwen2 model
self.model = AutoModel.from_pretrained(model_path)
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
if torch.cuda.is_available():
self.device = torch.device('cuda')
elif torch.backends.mps.is_available():
self.device = torch.device('mps')
else:
self.device = torch.device('cpu')
self.model.to(self.device)
self.model.eval()
def _get_embedding(self, text: str) -> np.ndarray:
"""Helper to get embeddings with proper handling"""
with torch.no_grad():
# Tokenize with attention masks
encoding = self.tokenizer(
text,
return_tensors='pt',
padding=True,
truncation=True,
max_length=512,
return_attention_mask=True
)
# Move to correct device
input_ids = encoding['input_ids'].to(self.device)
attention_mask = encoding['attention_mask'].to(self.device)
# Get model output
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True # Get all hidden states
)
# Get last hidden state
last_hidden_state = outputs.last_hidden_state
# Apply attention mask and mean pooling
# This is better than just taking CLS token
mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
sum_embeddings = torch.sum(last_hidden_state * mask_expanded, 1)
sum_mask = torch.clamp(mask_expanded.sum(1), min=1e-9)
embedding = (sum_embeddings / sum_mask).squeeze()
# Convert to numpy and normalize
embedding = embedding.cpu().numpy()
embedding = embedding / (np.linalg.norm(embedding) + 1e-9)
return embedding
def encode_jobposting(self, job_posting: JobPosting) -> np.ndarray:
"""Legacy job posting encoding using Qwen2"""
# Convert skills lists to strings
primary_skills_str = ', '.join(skill.skill_name for skill in job_posting.primary_skills) if job_posting.primary_skills else 'None'
secondary_skills_str = ', '.join(skill.skill_name for skill in job_posting.secondary_skills) if job_posting.secondary_skills else 'None'
# Concatenate all fields into one string, maintaining legacy format
text = f"""
Title: {job_posting.title}
About the Role:
{job_posting.role_description}
Company Description:
{job_posting.company_description}
Primary Skills Required:
{primary_skills_str}
Secondary Skills Preferred:
{secondary_skills_str}
"""
return self._get_embedding(text)
def encode_jobseeker(self, processed_jobseeker: IndependentJobSeekerAssessmentRDS,
unprocessed_jobseeker: JobseekerInfoRDS) -> np.ndarray:
"""Legacy job seeker encoding using Qwen2"""
# Create a single string with all relevant information
text = f"""
Summary: {unprocessed_jobseeker.summary}
Skills: {', '.join(processed_jobseeker.skills)}
Experience:
{json.dumps(processed_jobseeker.experiences, indent=2)}
Education:
{json.dumps(processed_jobseeker.educations, indent=2)}
Certifications:
{json.dumps(processed_jobseeker.certifications, indent=2)}
"""
return self._get_embedding(text)
def encode_fields(self, fields: Dict[str, str]) -> Dict[str, np.ndarray]:
"""Not used in legacy approach"""
pass
def create_encoders(model_name: str = 'all-mpnet-base-v2') -> tuple:
"""Create both job posting and seeker encoders using the same base model"""
print(f"Creating encoders using {model_name}...")
job_encoder = JobPostingEncoder(model_name)
seeker_encoder = JobSeekerEncoder(model_name)
print(f"Created encoders with embedding dimension: {job_encoder.embedding_dim}")
return job_encoder, seeker_encoder
FIELD_MAPPING = {
'title': ['summary'], # Job title maps to seeker summary
'primary_skills': ['primary_skills'], # Primary skills to primary skills
'secondary_skills': ['secondary_skills'], # Secondary skills to secondary skills
'role_description': ['experience', 'certifications'] # Role maps to both experience and certs
}
FIELD_WEIGHTS = {
'primary_skills_primary_skills': 0.5, # Increased - exact skill matches are critical
'secondary_skills_secondary_skills': 0.1, # Decreased - nice to have but less critical
'role_description_experience': 0.25,
'role_description_certifications': 0.05,
'title_summary': 0.1
}
__all__ = ['JobPostingEncoder', 'JobSeekerEncoder', 'LegacyFieldEncoder',
'create_encoders', 'FIELD_MAPPING', 'FIELD_WEIGHTS']