File size: 5,883 Bytes
efc9173 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
import re
from async_timeout import timeout
from pipecat.frames.frames import (
Frame,
LLMFullResponseEndFrame,
TextFrame,
UserStoppedSpeakingFrame)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.services.daily import DailyTransportMessageFrame
from utils.helpers import load_sounds
from prompts import IMAGE_GEN_PROMPT, CUE_USER_TURN, CUE_ASSISTANT_TURN
sounds = load_sounds(["talking.wav", "listening.wav", "ding.wav"])
# -------------- Frame Types ------------- #
class StoryPageFrame(TextFrame):
# Frame for each sentence in the story before a [break]
pass
class StoryImageFrame(TextFrame):
# Frame for trigger image generation
pass
class StoryPromptFrame(TextFrame):
# Frame for prompting the user for input
pass
# ------------ Frame Processors ----------- #
class StoryImageProcessor(FrameProcessor):
"""
Processor for image prompt frames that will be sent to the FAL service.
This processor is responsible for consuming frames of type `StoryImageFrame`.
It processes them by passing it to the FAL service.
The processed frames are then yielded back.
Attributes:
_fal_service (FALService): The FAL service, generates the images (fast fast!).
"""
def __init__(self, fal_service):
super().__init__()
self._fal_service = fal_service
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StoryImageFrame):
try:
async with timeout(7):
async for i in self._fal_service.run_image_gen(IMAGE_GEN_PROMPT % frame.text):
await self.push_frame(i)
except TimeoutError:
pass
pass
else:
await self.push_frame(frame)
class StoryProcessor(FrameProcessor):
"""
Primary frame processor. It takes the frames generated by the LLM
and processes them into image prompts and story pages (sentences).
For a clearer picture of how this works, reference prompts.py
Attributes:
_messages (list): A list of llm messages.
_text (str): A buffer to store the text from text frames.
_story (list): A list to store the story sentences, or 'pages'.
Methods:
process_frame: Processes a frame and removes any [break] or [image] tokens.
"""
def __init__(self, messages, story):
super().__init__()
self._messages = messages
self._text = ""
self._story = story
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, UserStoppedSpeakingFrame):
# Send an app message to the UI
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))
await self.push_frame(sounds["talking"])
elif isinstance(frame, TextFrame):
# We want to look for sentence breaks in the text
# but since TextFrames are streamed from the LLM
# we need to keep a buffer of the text we've seen so far
self._text += frame.text
# IMAGE PROMPT
# Looking for: < [image prompt] > in the LLM response
# We prompted our LLM to add an image prompt in the response
# so we use regex matching to find it and yield a StoryImageFrame
if re.search(r"<.*?>", self._text):
if not re.search(r"<.*?>.*?>", self._text):
# Pass any frames until we have a closing bracket
# otherwise the image prompt will be passed to TTS
pass
# Extract the image prompt from the text using regex
image_prompt = re.search(r"<(.*?)>", self._text).group(1)
# Remove the image prompt from the text
self._text = re.sub(r"<.*?>", '', self._text, count=1)
# Process the image prompt frame
await self.push_frame(StoryImageFrame(image_prompt))
# STORY PAGE
# Looking for: [break] in the LLM response
# We prompted our LLM to add a [break] after each sentence
# so we use regex matching to find it in the LLM response
if re.search(r".*\[[bB]reak\].*", self._text):
# Remove the [break] token from the text
# so it isn't spoken out loud by the TTS
self._text = re.sub(r'\[[bB]reak\]', '',
self._text, flags=re.IGNORECASE)
self._text = self._text.replace("\n", " ")
if len(self._text) > 2:
# Append the sentence to the story
self._story.append(self._text)
await self.push_frame(StoryPageFrame(self._text))
# Assert that it's the LLMs turn, until we're finished
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))
# Clear the buffer
self._text = ""
# End of a full LLM response
# Driven by the prompt, the LLM should have asked the user for input
elif isinstance(frame, LLMFullResponseEndFrame):
# We use a different frame type, as to avoid image generation ingest
await self.push_frame(StoryPromptFrame(self._text))
self._text = ""
await self.push_frame(frame)
# Send an app message to the UI
await self.push_frame(DailyTransportMessageFrame(CUE_USER_TURN))
await self.push_frame(sounds["listening"])
# Anything that is not a TextFrame pass through
else:
await self.push_frame(frame)
|