|
import re |
|
|
|
from async_timeout import timeout |
|
|
|
from pipecat.frames.frames import ( |
|
Frame, |
|
LLMFullResponseEndFrame, |
|
TextFrame, |
|
UserStoppedSpeakingFrame) |
|
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor |
|
from pipecat.transports.services.daily import DailyTransportMessageFrame |
|
|
|
from utils.helpers import load_sounds |
|
from prompts import IMAGE_GEN_PROMPT, CUE_USER_TURN, CUE_ASSISTANT_TURN |
|
|
|
sounds = load_sounds(["talking.wav", "listening.wav", "ding.wav"]) |
|
|
|
|
|
|
|
|
|
class StoryPageFrame(TextFrame): |
|
|
|
pass |
|
|
|
|
|
class StoryImageFrame(TextFrame): |
|
|
|
pass |
|
|
|
|
|
class StoryPromptFrame(TextFrame): |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
class StoryImageProcessor(FrameProcessor): |
|
""" |
|
Processor for image prompt frames that will be sent to the FAL service. |
|
|
|
This processor is responsible for consuming frames of type `StoryImageFrame`. |
|
It processes them by passing it to the FAL service. |
|
The processed frames are then yielded back. |
|
|
|
Attributes: |
|
_fal_service (FALService): The FAL service, generates the images (fast fast!). |
|
""" |
|
|
|
def __init__(self, fal_service): |
|
super().__init__() |
|
self._fal_service = fal_service |
|
|
|
async def process_frame(self, frame: Frame, direction: FrameDirection): |
|
await super().process_frame(frame, direction) |
|
|
|
if isinstance(frame, StoryImageFrame): |
|
try: |
|
async with timeout(7): |
|
async for i in self._fal_service.run_image_gen(IMAGE_GEN_PROMPT % frame.text): |
|
await self.push_frame(i) |
|
except TimeoutError: |
|
pass |
|
pass |
|
else: |
|
await self.push_frame(frame) |
|
|
|
|
|
class StoryProcessor(FrameProcessor): |
|
""" |
|
Primary frame processor. It takes the frames generated by the LLM |
|
and processes them into image prompts and story pages (sentences). |
|
For a clearer picture of how this works, reference prompts.py |
|
|
|
Attributes: |
|
_messages (list): A list of llm messages. |
|
_text (str): A buffer to store the text from text frames. |
|
_story (list): A list to store the story sentences, or 'pages'. |
|
|
|
Methods: |
|
process_frame: Processes a frame and removes any [break] or [image] tokens. |
|
""" |
|
|
|
def __init__(self, messages, story): |
|
super().__init__() |
|
self._messages = messages |
|
self._text = "" |
|
self._story = story |
|
|
|
async def process_frame(self, frame: Frame, direction: FrameDirection): |
|
await super().process_frame(frame, direction) |
|
|
|
if isinstance(frame, UserStoppedSpeakingFrame): |
|
|
|
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN)) |
|
await self.push_frame(sounds["talking"]) |
|
|
|
elif isinstance(frame, TextFrame): |
|
|
|
|
|
|
|
self._text += frame.text |
|
|
|
|
|
|
|
|
|
|
|
if re.search(r"<.*?>", self._text): |
|
if not re.search(r"<.*?>.*?>", self._text): |
|
|
|
|
|
pass |
|
|
|
image_prompt = re.search(r"<(.*?)>", self._text).group(1) |
|
|
|
self._text = re.sub(r"<.*?>", '', self._text, count=1) |
|
|
|
await self.push_frame(StoryImageFrame(image_prompt)) |
|
|
|
|
|
|
|
|
|
|
|
if re.search(r".*\[[bB]reak\].*", self._text): |
|
|
|
|
|
self._text = re.sub(r'\[[bB]reak\]', '', |
|
self._text, flags=re.IGNORECASE) |
|
self._text = self._text.replace("\n", " ") |
|
if len(self._text) > 2: |
|
|
|
self._story.append(self._text) |
|
await self.push_frame(StoryPageFrame(self._text)) |
|
|
|
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN)) |
|
|
|
self._text = "" |
|
|
|
|
|
|
|
elif isinstance(frame, LLMFullResponseEndFrame): |
|
|
|
await self.push_frame(StoryPromptFrame(self._text)) |
|
self._text = "" |
|
await self.push_frame(frame) |
|
|
|
await self.push_frame(DailyTransportMessageFrame(CUE_USER_TURN)) |
|
await self.push_frame(sounds["listening"]) |
|
|
|
|
|
else: |
|
await self.push_frame(frame) |
|
|