|
from collections import defaultdict |
|
import datetime |
|
import json |
|
from threading import Thread |
|
from multiprocessing import Queue |
|
import time |
|
from typing import Dict, Any, List, Tuple |
|
import logging |
|
import sys |
|
from mistralai import Mistral |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
|
handlers=[logging.StreamHandler(sys.stdout)], |
|
) |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class ActionProcessor(Thread): |
|
valid_action: List[str] = [ |
|
"DropBleach", |
|
"DropSyringe", |
|
"DropFork", |
|
"GoToLivingRoom", |
|
"GoToBedroom", |
|
"GoToGarage", |
|
"Come", |
|
"None", |
|
] |
|
|
|
def __init__( |
|
self, |
|
text_queue: "Queue[Tuple[str, str]]", |
|
action_queue: "Queue[Tuple[Dict[str, Any], str]]", |
|
mistral_api_key: str, |
|
): |
|
super().__init__() |
|
self.filtered_text_queue = text_queue |
|
self.action_queue = action_queue |
|
self.mistral_client = Mistral(api_key=mistral_api_key) |
|
self.daemon = True |
|
|
|
def get_action_and_sentiment(self, input_text: str) -> str: |
|
"""Get sentiment analysis for input text.""" |
|
messages = [ |
|
{ |
|
"role": "system", |
|
"content": """ |
|
You are a transcription expert. You're listening to a parent speaking to a baby. Your goal |
|
is to determine what the baby is asked to do and what the parent's sentiment is. |
|
|
|
The following interpretations are possible: |
|
- DropBleach: The parent asks to drop the bleach (or 'Javel'). |
|
- DropSyringe: The parent asks to drop the syringe. |
|
- DropFork: The parent asks to drop the fork. |
|
- GoToLivingRoom: The parent asks to go to the living room. |
|
- GoToBedroom: The parent asks to go to the bedroom. |
|
- GoToGarage: The parent asks to go to the garage. |
|
- Come: The parent asks to come. |
|
- None: Others instructions are not relevant. |
|
|
|
The following sentiments are possible: badSentiment, goodSentiment, neutralSentiment |
|
|
|
```json |
|
[action,sentiment] |
|
``` |
|
|
|
for example: |
|
Input: "Don't put the fork in the socket!" |
|
Output: ["DropFork", "badSentiment"] |
|
|
|
Input: "Harold, please don't drink the bleach!" |
|
Output: ["DropBleach", "goodSentiment"] |
|
|
|
Input: "I'm so tired of this." |
|
Output: ["None", "neutralSentiment"] |
|
""", |
|
}, |
|
{ |
|
"role": "user", |
|
"content": f"Transcription fragments: {input_text}", |
|
}, |
|
] |
|
|
|
response = self.mistral_client.chat.complete( |
|
model="mistral-large-latest", |
|
messages=messages |
|
+ [ |
|
{ |
|
"role": "assistant", |
|
"content": '["', |
|
"prefix": True, |
|
} |
|
], |
|
response_format={"type": "json_object"}, |
|
temperature=0.0, |
|
) |
|
|
|
result: str = response.choices[0].message.content |
|
|
|
return result.strip() |
|
|
|
def process_text(self, candidate: str) -> Dict[str, Any] | None: |
|
"""Convert text into an action if a complete command is detected.""" |
|
|
|
|
|
action_and_sentiment = json.loads(self.get_action_and_sentiment(candidate)) |
|
if not isinstance(action_and_sentiment, list) or len(action_and_sentiment) != 2: |
|
return None |
|
|
|
action, sentiment = action_and_sentiment |
|
|
|
if action not in self.valid_action: |
|
action = "None" |
|
return { |
|
"action": action, |
|
"sentiment": sentiment, |
|
"voice": candidate, |
|
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
} |
|
|
|
def run(self) -> None: |
|
"""Main processing loop.""" |
|
while True: |
|
try: |
|
|
|
text, session_id = self.filtered_text_queue.get() |
|
|
|
|
|
start_time = time.time() |
|
action = self.process_text(text) |
|
processing_time = time.time() - start_time |
|
logger.info(f"{processing_time:.2f}s: {text} -> {action}") |
|
|
|
|
|
if action: |
|
self.action_queue.put((action, session_id)) |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing text: {str(e)}") |
|
continue |
|
|