|
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
import time
|
|
|
|
from enum import Enum
|
|
|
|
from pipecat.frames.frames import ErrorFrame, Frame, MetricsFrame, StartFrame, StartInterruptionFrame, UserStoppedSpeakingFrame
|
|
from pipecat.utils.utils import obj_count, obj_id
|
|
|
|
from loguru import logger
|
|
|
|
|
|
class FrameDirection(Enum):
|
|
DOWNSTREAM = 1
|
|
UPSTREAM = 2
|
|
|
|
|
|
class FrameProcessorMetrics:
|
|
def __init__(self, name: str):
|
|
self._name = name
|
|
self._start_ttfb_time = 0
|
|
self._start_processing_time = 0
|
|
self._should_report_ttfb = True
|
|
|
|
async def start_ttfb_metrics(self, report_only_initial_ttfb):
|
|
if self._should_report_ttfb:
|
|
self._start_ttfb_time = time.time()
|
|
self._should_report_ttfb = not report_only_initial_ttfb
|
|
|
|
async def stop_ttfb_metrics(self):
|
|
if self._start_ttfb_time == 0:
|
|
return None
|
|
|
|
value = time.time() - self._start_ttfb_time
|
|
logger.debug(f"{self._name} TTFB: {value}")
|
|
ttfb = {
|
|
"processor": self._name,
|
|
"value": value
|
|
}
|
|
self._start_ttfb_time = 0
|
|
return MetricsFrame(ttfb=[ttfb])
|
|
|
|
async def start_processing_metrics(self):
|
|
self._start_processing_time = time.time()
|
|
|
|
async def stop_processing_metrics(self):
|
|
if self._start_processing_time == 0:
|
|
return None
|
|
|
|
value = time.time() - self._start_processing_time
|
|
logger.debug(f"{self._name} processing time: {value}")
|
|
processing = {
|
|
"processor": self._name,
|
|
"value": value
|
|
}
|
|
self._start_processing_time = 0
|
|
return MetricsFrame(processing=[processing])
|
|
|
|
|
|
class FrameProcessor:
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
name: str | None = None,
|
|
loop: asyncio.AbstractEventLoop | None = None,
|
|
**kwargs):
|
|
self.id: int = obj_id()
|
|
self.name = name or f"{self.__class__.__name__}#{obj_count(self)}"
|
|
self._prev: "FrameProcessor" | None = None
|
|
self._next: "FrameProcessor" | None = None
|
|
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
|
|
|
|
|
|
self._allow_interruptions = False
|
|
self._enable_metrics = False
|
|
self._report_only_initial_ttfb = False
|
|
|
|
|
|
self._metrics = FrameProcessorMetrics(name=self.name)
|
|
|
|
@property
|
|
def interruptions_allowed(self):
|
|
return self._allow_interruptions
|
|
|
|
@property
|
|
def metrics_enabled(self):
|
|
return self._enable_metrics
|
|
|
|
@property
|
|
def report_only_initial_ttfb(self):
|
|
return self._report_only_initial_ttfb
|
|
|
|
def can_generate_metrics(self) -> bool:
|
|
return False
|
|
|
|
async def start_ttfb_metrics(self):
|
|
if self.can_generate_metrics() and self.metrics_enabled:
|
|
await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb)
|
|
|
|
async def stop_ttfb_metrics(self):
|
|
if self.can_generate_metrics() and self.metrics_enabled:
|
|
frame = await self._metrics.stop_ttfb_metrics()
|
|
if frame:
|
|
await self.push_frame(frame)
|
|
|
|
async def start_processing_metrics(self):
|
|
if self.can_generate_metrics() and self.metrics_enabled:
|
|
await self._metrics.start_processing_metrics()
|
|
|
|
async def stop_processing_metrics(self):
|
|
if self.can_generate_metrics() and self.metrics_enabled:
|
|
frame = await self._metrics.stop_processing_metrics()
|
|
if frame:
|
|
await self.push_frame(frame)
|
|
|
|
async def stop_all_metrics(self):
|
|
await self.stop_ttfb_metrics()
|
|
await self.stop_processing_metrics()
|
|
|
|
async def cleanup(self):
|
|
pass
|
|
|
|
def link(self, processor: 'FrameProcessor'):
|
|
self._next = processor
|
|
processor._prev = self
|
|
logger.debug(f"Linking {self} -> {self._next}")
|
|
|
|
def get_event_loop(self) -> asyncio.AbstractEventLoop:
|
|
return self._loop
|
|
|
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
|
if isinstance(frame, StartFrame):
|
|
self._allow_interruptions = frame.allow_interruptions
|
|
self._enable_metrics = frame.enable_metrics
|
|
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
|
|
elif isinstance(frame, StartInterruptionFrame):
|
|
await self.stop_all_metrics()
|
|
elif isinstance(frame, UserStoppedSpeakingFrame):
|
|
self._should_report_ttfb = True
|
|
|
|
async def push_error(self, error: ErrorFrame):
|
|
await self.push_frame(error, FrameDirection.UPSTREAM)
|
|
|
|
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
|
try:
|
|
if direction == FrameDirection.DOWNSTREAM and self._next:
|
|
logger.trace(f"Pushing {frame} from {self} to {self._next}")
|
|
await self._next.process_frame(frame, direction)
|
|
elif direction == FrameDirection.UPSTREAM and self._prev:
|
|
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
|
|
await self._prev.process_frame(frame, direction)
|
|
except Exception as e:
|
|
logger.exception(f"Uncaught exception in {self}: {e}")
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|