naomi / data_utils.py
Mimi
asdF
39a8462
raw
history blame
8.44 kB
from abc import ABC
import pandas as pd
from enum import Enum
from uuid import uuid4
from datetime import datetime
from dataclasses import dataclass, field
from langchain_core.prompts import PromptTemplate
from datasets import load_dataset, DatasetDict, Dataset, concatenate_datasets
from typing import List, Dict, Any, Literal, Optional
username = 'mimipynb'
class HFConfig(Enum):
chat = username + '/naomi-dialogue'
users = username + '/naomi-users'
results = username + '/naomi-eval'
hub = username + '/agentNet'
pepe = username + '/agentNetHuman'
def load_agent_from_hf(agent_name):
""" Loads agent from HF """
botnet = load_dataset(HFConfig.hub.value, token=True, split='train').to_pandas()
chatbot = dict(zip(botnet.columns, *botnet[botnet['name'] == agent_name].values))
chatbot.pop('agent_type')
return Agent(**chatbot)
def load_main_user():
""" Loads main user from HF. To be removed / changed. """
pepes = load_dataset(HFConfig.pepe.value, token=True, split='train').to_pandas()
pepe = dict(zip(pepes.columns, *pepes[pepes['user_type'] == 'main'].values))
pepe.pop('user_type')
pepe.pop('relation_type')
pepe.pop('repo_id')
pepe.pop('input_file_path')
pepe.pop('output_file_path')
return User(**pepe)
def uploader(repo_id, new_data):
""" Appends new streaming sessions to HF space. """
original = load_dataset(repo_id=repo_id, token=True)
if isinstance(original, DatasetDict):
original = original['train']
concat = concatenate_datasets([original, new_data])
if len(concat) != len(original) + len(new_data):
raise ValueError(f"Expected concatenated data to be to be the sum of {len(original)} and {len(new_data)} but received {len(concat)} ")
concat.push_to_hub(
repo_id=repo_id,
private=True
)
print(f"Finished pushing to {repo_id}")
def end_session(naomi):
""" Data Handlers to run end of chat session. """
chat = naomi.chat.messages
user = naomi.user
results = naomi.results
uploader(HFConfig.chat.value, Dataset.from_pandas(chat))
uploader(HFConfig.users.value, Dataset.from_dict(user))
uploader(HFConfig.results.value, Dataset.from_pandas(results))
chat_messages = [
{'role': 'user', 'content': 'Hello!'},
{'role': 'assistant', 'content': 'Hi there! How can I assist you today?'},
{'role': 'user', 'content': 'I have a question about my order.'},
{'role': 'assistant', 'content': 'Sure! What would you like to know about your order?'},
{'role': 'user', 'content': 'When will it be delivered?'},
{'role': 'assistant', 'content': 'Let me check that for you. Can you provide me with your order number?'},
{'role': 'user', 'content': 'My order number is 123456.'},
{'role': 'assistant', 'content': 'Thank you! Your order is scheduled to be delivered on March 5th.'},
{'role': 'user', 'content': 'Great! Thank you for your help.'},
{'role': 'assistant', 'content': 'You’re welcome! If you have any more questions, feel free to ask.'},
{'role': 'user', 'content': 'Will do! Have a nice day.'},
{'role': 'assistant', 'content': 'You too! Take care!'}
]
@dataclass
class ChatMessage:
role: str
content: str
timestamp: str = field(default=datetime.now().isoformat())
inference: Dict[str, Any] = field(default_factory=dict)
def preprocess(self):
# Example preprocessing: strip whitespace and convert to lowercase
self.content = self.content.strip().lower()
def collect_features(self):
""" TODO:
- connect to classifiers / pipeline
- connect to agentDial
"""
self.inference['positive'] = 0.05
self.inference['negative'] = 0.05
self.inference['neutral'] = 0.90
self.inference['intent'] = 'greeting'
self.inference['mood'] = 'neutral'
def __post_init__(self):
""" Workflow of inferencing tools. """
self.preprocess()
self.collect_features()
@dataclass
class ChatSession:
_messages: List[ChatMessage] = field(default_factory=list)
session_id: str = field(default=uuid4().hex)
def __iter__(self):
# Iterates only the role and content for tokenizing.
for item in self._messages:
yield {
'role': item.role,
'content': item.content
}
def __getitem__(self, index):
""" Only returns the role and content for the requested index."""
if -len(self._messages) <= index < len(self._messages):
msg = self._messages[index]
return {
'role': msg.role,
'content': msg.content
}
raise IndexError
@property
def messages(self):
""" Returns dataframe. Includes inferenced features. """
data = pd.DataFrame(self._messages)
data['session_id'] = self.session_id
return data
def add_message(self, role: Literal['user', 'role', 'system'], content: str):
""" Adds messages to the chat sessions. """
message = ChatMessage(role=role, content=content)
self._messages.append(message)
@dataclass
class ProfileBase(ABC):
def __post_init__(self):
""" Base checks """
if hasattr(self, 'name') and self.name:
self.name = self.name.lower().capitalize()
if hasattr(self, 'prompt'):
prompt = PromptTemplate.from_template(self.prompt)
self.prompt = prompt
@dataclass
class Agent(ProfileBase):
"""Setup Agent Profile or Adds Agent to Bot Family"""
name: str
prompt: str
data: dict
def system_prompt(self, candidate):
try:
main_user = load_main_user()
prompt = self.prompt.invoke(
input=dict(
user_name=main_user.name,
user_likes="\n".join(main_user.likes),
user_dislikes="\n".join(main_user.dislikes),
candidate_details=candidate.format_profile(),
**self.data
)
)
print(f"Parsed prompt: {prompt}. Full input: \n{prompt.text}")
return [{'role': 'system', 'content': prompt.text}]
except Exception as e:
print(e)
raise
@dataclass
class Contact(ProfileBase):
"""User's Metaclasses -- Social"""
instagram: Optional[str] = None
email: Optional[str] = None
mobile: Optional[str] = None
@dataclass
class Biography:
"""User's Metaclasses -- Biography / FAQs"""
dob: Optional[str] = None
location: Optional[str] = None
mbti_label: Optional[str] = None
education: Optional[str] = None
occupation: Optional[str] = None
@dataclass
class User(Biography, Contact):
"""User's Datahandler for account creation. Metaclass: Contact"""
name: str = field(default_factory=str)
likes: List[str] = field(default_factory=list)
dislikes: List[str] = field(default_factory=list)
@dataclass
class Candidate(Contact, Biography):
"""Interviewing Candidate Accessor for Agents roleplaying as Interviewers."""
name: str = field(default=str)
id: str = field(default=uuid4().hex)
def format_profile(self):
return "".join([f"{key}: {val}\n" for key, val in self.__dict__.items() if val is not None or key not in ('output_file_path', 'input_file_path', 'id')])
def new_user(**kwargs):
""" Process inputs collected from frontend to backend. Returns Candidate. """
contact_type = kwargs.get('contact_type', None)
if contact_type is not None:
contact = Contact.__match_args__[contact_type] if isinstance(contact_type, int) else contact_type
kwargs.update({contact: kwargs.get('contact', None)})
kwargs.pop('contact_type')
kwargs.pop('contact')
return Candidate(**kwargs)
if __name__ == "__main__":
# Example usage for chat session
"""
chat_session = ChatSession()
for msg in chat_messages:
chat_session.add_message(msg['role'], msg['content'])
print(chat_session.messages)
"""
# user = load_main_user()
# print(user)
test_user = {
'name': 'mike',
'contact_type': 1,
'contact': 'asdf@asdf.com',
'dob': '29/12/1800',
'location': 'north korea',
'intake_submission': True
}
candy = new_user(**test_user)
print(candy)