from typing import Dict, List, Any from scipy.special import softmax from collections import Counter import numpy as np import weakref import re import nltk from nltk.corpus import stopwords nltk.download('stopwords') from utils import clean_str, clean_str_nopunct import torch from utils import MultiHeadModel, BertInputBuilder, get_num_words, MATH_PREFIXES, MATH_WORDS, plural_to_singular import transformers from transformers import BertTokenizer, BertForSequenceClassification from transformers.utils import logging transformers.logging.set_verbosity_debug() UPTAKE_MODEL = 'ddemszky/uptake-model' REASONING_MODEL = 'ddemszky/student-reasoning' QUESTION_MODEL = 'ddemszky/question-detection' FOCUSING_QUESTION_MODEL = 'ddemszky/focusing-questions' class Utterance: def __init__(self, speaker, text, uid=None, transcript=None, starttime=None, endtime=None, **kwargs): self.speaker = speaker self.text = text self.uid = uid self.starttime = starttime self.endtime = endtime self.transcript = weakref.ref(transcript) if transcript else None self.props = kwargs self.role = None self.word_count = self.get_num_words() self.timestamp = [starttime, endtime] if starttime is not None and endtime is not None: self.unit_measure = endtime - starttime else: self.unit_measure = None self.aggregate_unit_measure = endtime self.num_math_terms = None self.math_terms = None # moments self.uptake = None self.reasoning = None self.question = None self.focusing_question = None def get_clean_text(self, remove_punct=False): if remove_punct: return clean_str_nopunct(self.text) return clean_str(self.text) def get_num_words(self): return get_num_words(self.text) def to_dict(self): return { 'speaker': self.speaker, 'text': self.text, 'uid': self.uid, 'starttime': self.starttime, 'endtime': self.endtime, 'uptake': self.uptake, 'reasoning': self.reasoning, 'question': self.question, 'focusingQuestion': self.focusing_question, 'numMathTerms': self.num_math_terms, 'mathTerms': self.math_terms, **self.props } def to_talk_timeline_dict(self): return{ 'speaker': self.speaker, 'text': self.text, 'uid': self.uid, 'role': self.role, 'timestamp': self.timestamp, 'moments': {'reasoning': True if self.reasoning else False, 'questioning': True if self.question else False, 'uptake': True if self.uptake else False, 'focusingQuestion': True if self.focusing_question else False}, 'unitMeasure': self.unit_measure, 'aggregateUnitMeasure': self.aggregate_unit_measure, 'wordCount': self.word_count, 'numMathTerms': self.num_math_terms, 'mathTerms': self.math_terms, } def __repr__(self): return f"Utterance(speaker='{self.speaker}'," \ f"text='{self.text}', uid={self.uid}," \ f"starttime={self.starttime}, endtime={self.endtime}, props={self.props})" class Transcript: def __init__(self, **kwargs): self.utterances = [] self.params = kwargs def add_utterance(self, utterance): utterance.transcript = weakref.ref(self) self.utterances.append(utterance) def get_idx(self, idx): if idx >= len(self.utterances): return None return self.utterances[idx] def get_uid(self, uid): for utt in self.utterances: if utt.uid == uid: return utt return None def length(self): return len(self.utterances) def update_utterance_roles(self, uptake_speaker): for utt in self.utterances: if (utt.speaker == uptake_speaker): utt.role = 'teacher' else: utt.role = 'student' def get_talk_distribution_and_length(self, uptake_speaker): if ((uptake_speaker is None)): return None teacher_words = 0 teacher_utt_count = 0 student_words = 0 student_utt_count = 0 for utt in self.utterances: if (utt.speaker == uptake_speaker): utt.role = 'teacher' teacher_words += utt.get_num_words() teacher_utt_count += 1 else: utt.role = 'student' student_words += utt.get_num_words() student_utt_count += 1 if teacher_words + student_words > 0: teacher_percentage = round( (teacher_words / (teacher_words + student_words)) * 100) student_percentage = 100 - teacher_percentage else: teacher_percentage = student_percentage = 0 avg_teacher_length = teacher_words / teacher_utt_count if teacher_utt_count > 0 else 0 avg_student_length = student_words / student_utt_count if student_utt_count > 0 else 0 return {'teacher': teacher_percentage, 'student': student_percentage}, {'teacher': avg_teacher_length, 'student': avg_student_length} def get_word_clouds(self): # Initialize dictionaries teacher_dict = Counter() student_dict = Counter() uptake_teacher_dict = Counter() stop_words = stopwords.words('english') # Go through the utterances for utt in self.utterances: # Get clean text clean_text = utt.get_clean_text(remove_punct=True) words = clean_text.split() words = [word for word in words if word not in stop_words and word not in ['inaudible', 'crosstalk']] # Handle uptake case if utt.role == 'teacher' and utt.uptake == 1: uptake_teacher_dict.update(words) general_text = ' '.join(words) # Replace math terms with empty strings for math_term in utt.math_terms: general_text = general_text.replace(math_term, '') general_text = general_text.replace(' ', ' ') general_words = general_text.split() # Update the appropriate dictionary if utt.role == 'teacher': teacher_dict.update(general_words) else: student_dict.update(general_words) def dict_to_list(d, category): combined_dict = Counter() for word, count in d.items(): singular_word = plural_to_singular(word) combined_dict[singular_word] += count return [{'text': word, 'value': count, 'category': category} for word, count in combined_dict.items()] # Sorting and trimming dictionaries dict_list = dict_to_list(teacher_dict, 'general') + dict_to_list(student_dict, 'general') uptake_dict_list = dict_to_list(uptake_teacher_dict, 'teacher') teacher_dict_list = dict_to_list(teacher_dict, 'general') student_dict_list = dict_to_list(student_dict, 'general') sorted_dict_list = sorted(dict_list, key=lambda x: x['value'], reverse=True) sorted_uptake_dict_list = sorted(uptake_dict_list, key=lambda x: x['value'], reverse=True) sorted_teacher_dict_list = sorted(teacher_dict_list, key=lambda x: x['value'], reverse=True) sorted_student_dict_list = sorted(student_dict_list, key=lambda x: x['value'], reverse=True) return sorted_dict_list[:50], sorted_uptake_dict_list[:50], sorted_teacher_dict_list[:50], sorted_student_dict_list[:50] def get_talk_timeline(self): return [utterance.to_talk_timeline_dict() for utterance in self.utterances] def calculate_aggregate_word_count(self): unit_measures = [utt.unit_measure for utt in self.utterances] if None in unit_measures: aggregate_word_count = 0 for utt in self.utterances: aggregate_word_count += utt.get_num_words() utt.unit_measure = utt.get_num_words() utt.aggregate_unit_measure = aggregate_word_count def to_dict(self): return { 'utterances': [utterance.to_dict() for utterance in self.utterances], **self.params } def __repr__(self): return f"Transcript(utterances={self.utterances}, custom_params={self.params})" class QuestionModel: def __init__(self, device, tokenizer, input_builder, max_length=300, path=QUESTION_MODEL): print("Loading models...") self.device = device self.tokenizer = tokenizer self.input_builder = input_builder self.max_length = max_length self.model = MultiHeadModel.from_pretrained( path, head2size={"is_question": 2}) self.model.to(self.device) def run_inference(self, transcript): self.model.eval() with torch.no_grad(): for i, utt in enumerate(transcript.utterances): if "?" in utt.text: utt.question = 1 else: text = utt.get_clean_text(remove_punct=True) instance = self.input_builder.build_inputs([], text, max_length=self.max_length, input_str=True) output = self.get_prediction(instance) # print(output) utt.question = np.argmax( output["is_question_logits"][0].tolist()) def get_prediction(self, instance): instance["attention_mask"] = [[1] * len(instance["input_ids"])] for key in ["input_ids", "token_type_ids", "attention_mask"]: instance[key] = torch.tensor( instance[key]).unsqueeze(0) # Batch size = 1 instance[key].to(self.device) output = self.model(input_ids=instance["input_ids"], attention_mask=instance["attention_mask"], token_type_ids=instance["token_type_ids"], return_pooler_output=False) return output class ReasoningModel: def __init__(self, device, tokenizer, input_builder, max_length=128, path=REASONING_MODEL): print("Loading models...") self.device = device self.tokenizer = tokenizer self.input_builder = input_builder self.max_length = max_length self.model = BertForSequenceClassification.from_pretrained(path) self.model.to(self.device) def run_inference(self, transcript, min_num_words=8, uptake_speaker=None): self.model.eval() with torch.no_grad(): for i, utt in enumerate(transcript.utterances): if utt.get_num_words() >= min_num_words and utt.speaker != uptake_speaker: instance = self.input_builder.build_inputs([], utt.text, max_length=self.max_length, input_str=True) output = self.get_prediction(instance) utt.reasoning = np.argmax(output["logits"][0].tolist()) def get_prediction(self, instance): instance["attention_mask"] = [[1] * len(instance["input_ids"])] for key in ["input_ids", "token_type_ids", "attention_mask"]: instance[key] = torch.tensor( instance[key]).unsqueeze(0) # Batch size = 1 instance[key].to(self.device) output = self.model(input_ids=instance["input_ids"], attention_mask=instance["attention_mask"], token_type_ids=instance["token_type_ids"]) return output class UptakeModel: def __init__(self, device, tokenizer, input_builder, max_length=120, path=UPTAKE_MODEL): print("Loading models...") self.device = device self.tokenizer = tokenizer self.input_builder = input_builder self.max_length = max_length self.model = MultiHeadModel.from_pretrained(path, head2size={"nsp": 2}) self.model.to(self.device) def run_inference(self, transcript, min_prev_words, uptake_speaker=None): self.model.eval() prev_num_words = 0 prev_utt = None with torch.no_grad(): for i, utt in enumerate(transcript.utterances): if ((uptake_speaker is None) or (utt.speaker == uptake_speaker)) and (prev_num_words >= min_prev_words): textA = prev_utt.get_clean_text(remove_punct=False) textB = utt.get_clean_text(remove_punct=False) instance = self.input_builder.build_inputs([textA], textB, max_length=self.max_length, input_str=True) output = self.get_prediction(instance) utt.uptake = int( softmax(output["nsp_logits"][0].tolist())[1] > .8) prev_num_words = utt.get_num_words() prev_utt = utt def get_prediction(self, instance): instance["attention_mask"] = [[1] * len(instance["input_ids"])] for key in ["input_ids", "token_type_ids", "attention_mask"]: instance[key] = torch.tensor( instance[key]).unsqueeze(0) # Batch size = 1 instance[key].to(self.device) output = self.model(input_ids=instance["input_ids"], attention_mask=instance["attention_mask"], token_type_ids=instance["token_type_ids"], return_pooler_output=False) return output class FocusingQuestionModel: def __init__(self, device, tokenizer, input_builder, max_length=128, path=FOCUSING_QUESTION_MODEL): print("Loading models...") self.device = device self.tokenizer = tokenizer self.input_builder = input_builder self.model = BertForSequenceClassification.from_pretrained(path) self.model.to(self.device) self.max_length = max_length def run_inference(self, transcript, min_focusing_words=0, uptake_speaker=None): self.model.eval() with torch.no_grad(): for i, utt in enumerate(transcript.utterances): if utt.speaker != uptake_speaker or uptake_speaker is None: utt.focusing_question = None continue if utt.get_num_words() < min_focusing_words: utt.focusing_question = None continue instance = self.input_builder.build_inputs([], utt.text, max_length=self.max_length, input_str=True) output = self.get_prediction(instance) utt.focusing_question = np.argmax(output["logits"][0].tolist()) def get_prediction(self, instance): instance["attention_mask"] = [[1] * len(instance["input_ids"])] for key in ["input_ids", "token_type_ids", "attention_mask"]: instance[key] = torch.tensor( instance[key]).unsqueeze(0) # Batch size = 1 instance[key].to(self.device) output = self.model(input_ids=instance["input_ids"], attention_mask=instance["attention_mask"], token_type_ids=instance["token_type_ids"]) return output def load_math_terms(): math_regexes = [] math_terms_dict = {} for term in MATH_PREFIXES: math_terms_dict[rf"\b{term}(s|es|d|ed)?\b"] = term math_regexes.append(rf"\b{term}(s|es|d|ed)?\b") for term in MATH_WORDS: if not term in MATH_PREFIXES: math_terms_dict[rf"\b{term}\b"] = term math_regexes.append(rf"\b{term}\b") return math_regexes, math_terms_dict def run_math_density(transcript): math_regexes, math_terms_dict = load_math_terms() sorted_regexes = sorted(math_regexes, key=len, reverse=True) teacher_math_word_cloud = {} student_math_word_cloud = {} for i, utt in enumerate(transcript.utterances): text = utt.get_clean_text(remove_punct=True) num_matches = 0 matched_positions = set() match_list = set() for regex in sorted_regexes: matches = list(re.finditer(regex, text, re.IGNORECASE)) # Filter out matches that share positions with longer terms matches = [match for match in matches if not any(match.start() in range(existing[0], existing[1]) for existing in matched_positions)] if len(matches) > 0: if utt.role == "teacher": if math_terms_dict[regex] not in teacher_math_word_cloud: teacher_math_word_cloud[math_terms_dict[regex]] = 0 teacher_math_word_cloud[math_terms_dict[regex]] += len(matches) else: if math_terms_dict[regex] not in student_math_word_cloud: student_math_word_cloud[math_terms_dict[regex]] = 0 student_math_word_cloud[math_terms_dict[regex]] += len(matches) for match in matches: match_list.add(match.group()) matched_positions.add((match.start(), match.end())) num_matches += len(matches) utt.num_math_terms = num_matches utt.math_terms = list(match_list) teacher_dict_list = [] student_dict_list = [] dict_list = [] for word in teacher_math_word_cloud.keys(): teacher_dict_list.append( {'text': word, 'value': teacher_math_word_cloud[word], 'category': "math"}) dict_list.append({'text': word, 'value': teacher_math_word_cloud[word], 'category': "math"}) for word in student_math_word_cloud.keys(): student_dict_list.append( {'text': word, 'value': student_math_word_cloud[word], 'category': "math"}) dict_list.append({'text': word, 'value': student_math_word_cloud[word], 'category': "math"}) sorted_dict_list = sorted(dict_list, key=lambda x: x['value'], reverse=True) sorted_teacher_dict_list = sorted(teacher_dict_list, key=lambda x: x['value'], reverse=True) sorted_student_dict_list = sorted(student_dict_list, key=lambda x: x['value'], reverse=True) # return sorted_dict_list[:50] return sorted_dict_list[:50], sorted_teacher_dict_list[:50], sorted_student_dict_list[:50] class EndpointHandler(): def __init__(self, path="."): print("Loading models...") self.device = "cuda" if torch.cuda.is_available() else "cpu" self.tokenizer = BertTokenizer.from_pretrained("bert-base-uncased") self.input_builder = BertInputBuilder(tokenizer=self.tokenizer) def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: """ data args: inputs (:obj: `list`): List of dicts, where each dict represents an utterance; each utterance object must have a `speaker`, `text` and `uid`and can include list of custom properties parameters (:obj: `dict`) Return: A :obj:`list` | `dict`: will be serialized and returned """ # get inputs utterances = data.pop("inputs", data) params = data.pop("parameters", None) transcript = Transcript(filename=params.pop("filename", None)) for utt in utterances: transcript.add_utterance(Utterance(**utt)) print("Running inference on %d examples..." % transcript.length()) logging.set_verbosity_info() # Uptake uptake_model = UptakeModel( self.device, self.tokenizer, self.input_builder) uptake_speaker = params.pop("uptake_speaker", None) uptake_model.run_inference(transcript, min_prev_words=params['uptake_min_num_words'], uptake_speaker=uptake_speaker) del uptake_model # Reasoning reasoning_model = ReasoningModel( self.device, self.tokenizer, self.input_builder) reasoning_model.run_inference(transcript, uptake_speaker=uptake_speaker) del reasoning_model # Question question_model = QuestionModel( self.device, self.tokenizer, self.input_builder) question_model.run_inference(transcript) del question_model # Focusing Question focusing_question_model = FocusingQuestionModel( self.device, self.tokenizer, self.input_builder) focusing_question_model.run_inference(transcript, uptake_speaker=uptake_speaker) del focusing_question_model transcript.update_utterance_roles(uptake_speaker) sorted_math_cloud, teacher_math_cloud, student_math_cloud = run_math_density(transcript) transcript.calculate_aggregate_word_count() return_dict = {'talkDistribution': None, 'talkLength': None, 'talkMoments': None, 'studentTopWords': None, 'teacherTopWords': None} talk_dist, talk_len = transcript.get_talk_distribution_and_length(uptake_speaker) return_dict['talkDistribution'] = talk_dist return_dict['talkLength'] = talk_len talk_moments = transcript.get_talk_timeline() return_dict['talkMoments'] = talk_moments word_cloud, uptake_word_cloud, teacher_general_cloud, student_general_cloud = transcript.get_word_clouds() teacher_cloud = teacher_math_cloud + teacher_general_cloud student_cloud = student_math_cloud + student_general_cloud return_dict['teacherTopWords'] = teacher_cloud return_dict['studentTopWords'] = student_cloud return return_dict