import re from async_timeout import timeout from pipecat.frames.frames import ( Frame, LLMFullResponseEndFrame, TextFrame, UserStoppedSpeakingFrame) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transports.services.daily import DailyTransportMessageFrame from utils.helpers import load_sounds from prompts import IMAGE_GEN_PROMPT, CUE_USER_TURN, CUE_ASSISTANT_TURN sounds = load_sounds(["talking.wav", "listening.wav", "ding.wav"]) # -------------- Frame Types ------------- # class StoryPageFrame(TextFrame): # Frame for each sentence in the story before a [break] pass class StoryImageFrame(TextFrame): # Frame for trigger image generation pass class StoryPromptFrame(TextFrame): # Frame for prompting the user for input pass # ------------ Frame Processors ----------- # class StoryImageProcessor(FrameProcessor): """ Processor for image prompt frames that will be sent to the FAL service. This processor is responsible for consuming frames of type `StoryImageFrame`. It processes them by passing it to the FAL service. The processed frames are then yielded back. Attributes: _fal_service (FALService): The FAL service, generates the images (fast fast!). """ def __init__(self, fal_service): super().__init__() self._fal_service = fal_service async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, StoryImageFrame): try: async with timeout(7): async for i in self._fal_service.run_image_gen(IMAGE_GEN_PROMPT % frame.text): await self.push_frame(i) except TimeoutError: pass pass else: await self.push_frame(frame) class StoryProcessor(FrameProcessor): """ Primary frame processor. It takes the frames generated by the LLM and processes them into image prompts and story pages (sentences). For a clearer picture of how this works, reference prompts.py Attributes: _messages (list): A list of llm messages. _text (str): A buffer to store the text from text frames. _story (list): A list to store the story sentences, or 'pages'. Methods: process_frame: Processes a frame and removes any [break] or [image] tokens. """ def __init__(self, messages, story): super().__init__() self._messages = messages self._text = "" self._story = story async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, UserStoppedSpeakingFrame): # Send an app message to the UI await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN)) await self.push_frame(sounds["talking"]) elif isinstance(frame, TextFrame): # We want to look for sentence breaks in the text # but since TextFrames are streamed from the LLM # we need to keep a buffer of the text we've seen so far self._text += frame.text # IMAGE PROMPT # Looking for: < [image prompt] > in the LLM response # We prompted our LLM to add an image prompt in the response # so we use regex matching to find it and yield a StoryImageFrame if re.search(r"<.*?>", self._text): if not re.search(r"<.*?>.*?>", self._text): # Pass any frames until we have a closing bracket # otherwise the image prompt will be passed to TTS pass # Extract the image prompt from the text using regex image_prompt = re.search(r"<(.*?)>", self._text).group(1) # Remove the image prompt from the text self._text = re.sub(r"<.*?>", '', self._text, count=1) # Process the image prompt frame await self.push_frame(StoryImageFrame(image_prompt)) # STORY PAGE # Looking for: [break] in the LLM response # We prompted our LLM to add a [break] after each sentence # so we use regex matching to find it in the LLM response if re.search(r".*\[[bB]reak\].*", self._text): # Remove the [break] token from the text # so it isn't spoken out loud by the TTS self._text = re.sub(r'\[[bB]reak\]', '', self._text, flags=re.IGNORECASE) self._text = self._text.replace("\n", " ") if len(self._text) > 2: # Append the sentence to the story self._story.append(self._text) await self.push_frame(StoryPageFrame(self._text)) # Assert that it's the LLMs turn, until we're finished await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN)) # Clear the buffer self._text = "" # End of a full LLM response # Driven by the prompt, the LLM should have asked the user for input elif isinstance(frame, LLMFullResponseEndFrame): # We use a different frame type, as to avoid image generation ingest await self.push_frame(StoryPromptFrame(self._text)) self._text = "" await self.push_frame(frame) # Send an app message to the UI await self.push_frame(DailyTransportMessageFrame(CUE_USER_TURN)) await self.push_frame(sounds["listening"]) # Anything that is not a TextFrame pass through else: await self.push_frame(frame)