storytelling-backup / src /processors.py
lucy1118's picture
Upload 53 files
efc9173 verified
import re
from async_timeout import timeout
from pipecat.frames.frames import (
Frame,
LLMFullResponseEndFrame,
TextFrame,
UserStoppedSpeakingFrame)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.services.daily import DailyTransportMessageFrame
from utils.helpers import load_sounds
from prompts import IMAGE_GEN_PROMPT, CUE_USER_TURN, CUE_ASSISTANT_TURN
sounds = load_sounds(["talking.wav", "listening.wav", "ding.wav"])
# -------------- Frame Types ------------- #
class StoryPageFrame(TextFrame):
# Frame for each sentence in the story before a [break]
pass
class StoryImageFrame(TextFrame):
# Frame for trigger image generation
pass
class StoryPromptFrame(TextFrame):
# Frame for prompting the user for input
pass
# ------------ Frame Processors ----------- #
class StoryImageProcessor(FrameProcessor):
"""
Processor for image prompt frames that will be sent to the FAL service.
This processor is responsible for consuming frames of type `StoryImageFrame`.
It processes them by passing it to the FAL service.
The processed frames are then yielded back.
Attributes:
_fal_service (FALService): The FAL service, generates the images (fast fast!).
"""
def __init__(self, fal_service):
super().__init__()
self._fal_service = fal_service
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StoryImageFrame):
try:
async with timeout(7):
async for i in self._fal_service.run_image_gen(IMAGE_GEN_PROMPT % frame.text):
await self.push_frame(i)
except TimeoutError:
pass
pass
else:
await self.push_frame(frame)
class StoryProcessor(FrameProcessor):
"""
Primary frame processor. It takes the frames generated by the LLM
and processes them into image prompts and story pages (sentences).
For a clearer picture of how this works, reference prompts.py
Attributes:
_messages (list): A list of llm messages.
_text (str): A buffer to store the text from text frames.
_story (list): A list to store the story sentences, or 'pages'.
Methods:
process_frame: Processes a frame and removes any [break] or [image] tokens.
"""
def __init__(self, messages, story):
super().__init__()
self._messages = messages
self._text = ""
self._story = story
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, UserStoppedSpeakingFrame):
# Send an app message to the UI
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))
await self.push_frame(sounds["talking"])
elif isinstance(frame, TextFrame):
# We want to look for sentence breaks in the text
# but since TextFrames are streamed from the LLM
# we need to keep a buffer of the text we've seen so far
self._text += frame.text
# IMAGE PROMPT
# Looking for: < [image prompt] > in the LLM response
# We prompted our LLM to add an image prompt in the response
# so we use regex matching to find it and yield a StoryImageFrame
if re.search(r"<.*?>", self._text):
if not re.search(r"<.*?>.*?>", self._text):
# Pass any frames until we have a closing bracket
# otherwise the image prompt will be passed to TTS
pass
# Extract the image prompt from the text using regex
image_prompt = re.search(r"<(.*?)>", self._text).group(1)
# Remove the image prompt from the text
self._text = re.sub(r"<.*?>", '', self._text, count=1)
# Process the image prompt frame
await self.push_frame(StoryImageFrame(image_prompt))
# STORY PAGE
# Looking for: [break] in the LLM response
# We prompted our LLM to add a [break] after each sentence
# so we use regex matching to find it in the LLM response
if re.search(r".*\[[bB]reak\].*", self._text):
# Remove the [break] token from the text
# so it isn't spoken out loud by the TTS
self._text = re.sub(r'\[[bB]reak\]', '',
self._text, flags=re.IGNORECASE)
self._text = self._text.replace("\n", " ")
if len(self._text) > 2:
# Append the sentence to the story
self._story.append(self._text)
await self.push_frame(StoryPageFrame(self._text))
# Assert that it's the LLMs turn, until we're finished
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))
# Clear the buffer
self._text = ""
# End of a full LLM response
# Driven by the prompt, the LLM should have asked the user for input
elif isinstance(frame, LLMFullResponseEndFrame):
# We use a different frame type, as to avoid image generation ingest
await self.push_frame(StoryPromptFrame(self._text))
self._text = ""
await self.push_frame(frame)
# Send an app message to the UI
await self.push_frame(DailyTransportMessageFrame(CUE_USER_TURN))
await self.push_frame(sounds["listening"])
# Anything that is not a TextFrame pass through
else:
await self.push_frame(frame)