Spaces:
Runtime error
Runtime error
File size: 5,060 Bytes
786e242 269beb6 786e242 c0d025d f3f4f5f 786e242 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import asyncio
import copy
import logging
from collections import deque
from typing import Annotated, List
import os
import dotenv
from livekit import agents, rtc
from livekit.agents import JobContext, JobRequest, WorkerOptions, cli
from livekit.agents.llm import (
ChatContext,
ChatMessage,
ChatRole,
)
from livekit.agents.voice_assistant import AssistantContext, VoiceAssistant
from livekit.plugins import deepgram, openai, silero
dotenv.load_dotenv()
print(os.environ.get("LIVEKIT_URL"))
setattr(openai.TTS, 'sample_rate', 22050)
MAX_IMAGES = 3
NO_IMAGE_MESSAGE_GENERIC = (
"I'm sorry, I don't have an image to process. Are you publishing your video?"
)
class AssistantFnc(agents.llm.FunctionContext):
@agents.llm.ai_callable(
desc="Called when asked to evaluate something that would require vision capabilities."
)
async def image(
self,
user_msg: Annotated[
str,
agents.llm.TypeInfo(desc="The user message that triggered this function"),
],
):
ctx = AssistantContext.get_current()
ctx.store_metadata("user_msg", user_msg)
async def get_human_video_track(room: rtc.Room):
track_future = asyncio.Future[rtc.RemoteVideoTrack]()
def on_sub(track: rtc.Track, *_):
if isinstance(track, rtc.RemoteVideoTrack):
track_future.set_result(track)
room.on("track_subscribed", on_sub)
remote_video_tracks: List[rtc.RemoteVideoTrack] = []
for _, p in room.participants.items():
for _, t_pub in p.tracks.items():
if t_pub.track is not None and isinstance(
t_pub.track, rtc.RemoteVideoTrack
):
remote_video_tracks.append(t_pub.track)
if len(remote_video_tracks) > 0:
track_future.set_result(remote_video_tracks[0])
video_track = await track_future
room.off("track_subscribed", on_sub)
return video_track
async def entrypoint(ctx: JobContext):
sip = ctx.room.name.startswith("sip")
initial_ctx = ChatContext(
messages=[
ChatMessage(
role=ChatRole.SYSTEM,
text=(
"You are a funny bot created by LiveKit. Your interface with users will be voice. "
"You should use short and concise responses, and avoiding usage of unpronouncable punctuation."
),
)
]
)
gpt = openai.LLM(
model="gpt-3.5-turbo",
)
latest_image: rtc.VideoFrame | None = None
img_msg_queue: deque[agents.llm.ChatMessage] = deque()
assistant = VoiceAssistant(
vad=silero.VAD(),
stt=deepgram.STT(),
llm=gpt,
tts=openai.TTS(
model='tts-1-hd',
voice="fable",
base_url=os.environ.get("TTS_URL"),
api_key=os.environ.get("TTS_API_KEY")
),
fnc_ctx=None if sip else AssistantFnc(),
chat_ctx=initial_ctx,
)
chat = rtc.ChatManager(ctx.room)
async def _answer_from_text(text: str):
chat_ctx = copy.deepcopy(assistant.chat_context)
chat_ctx.messages.append(ChatMessage(role=ChatRole.USER, text=text))
stream = await gpt.chat(chat_ctx)
await assistant.say(stream)
@chat.on("message_received")
def on_chat_received(msg: rtc.ChatMessage):
if not msg.message:
return
asyncio.create_task(_answer_from_text(msg.message))
async def respond_to_image(user_msg: str):
nonlocal latest_image, img_msg_queue, initial_ctx
if not latest_image:
await assistant.say(NO_IMAGE_MESSAGE_GENERIC)
return
initial_ctx.messages.append(
agents.llm.ChatMessage(
role=agents.llm.ChatRole.USER,
text=user_msg,
images=[agents.llm.ChatImage(image=latest_image)],
)
)
img_msg_queue.append(initial_ctx.messages[-1])
if len(img_msg_queue) >= MAX_IMAGES:
msg = img_msg_queue.popleft()
msg.images = []
stream = await gpt.chat(initial_ctx)
await assistant.say(stream, allow_interruptions=True)
@assistant.on("function_calls_finished")
def _function_calls_done(ctx: AssistantContext):
user_msg = ctx.get_metadata("user_msg")
if not user_msg:
return
asyncio.ensure_future(respond_to_image(user_msg))
assistant.start(ctx.room)
await asyncio.sleep(0.5)
await assistant.say("Hey, how can I help you today?", allow_interruptions=True)
while ctx.room.connection_state == rtc.ConnectionState.CONN_CONNECTED:
video_track = await get_human_video_track(ctx.room)
async for event in rtc.VideoStream(video_track):
latest_image = event.frame
async def request_fnc(req: JobRequest) -> None:
logging.info("received request %s", req)
await req.accept(entrypoint)
if __name__ == "__main__":
cli.run_app(WorkerOptions(request_fnc,host="0.0.0.0",port=7860))
|