|
|
|
|
|
|
|
|
|
|
|
|
|
from typing import Dict, List
|
|
|
|
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
|
from pipecat.services.openai import BaseOpenAILLMService
|
|
|
|
from loguru import logger
|
|
|
|
try:
|
|
from openpipe import AsyncOpenAI as OpenPipeAI, AsyncStream
|
|
from openai.types.chat import (ChatCompletionMessageParam, ChatCompletionChunk)
|
|
except ModuleNotFoundError as e:
|
|
logger.error(f"Exception: {e}")
|
|
logger.error(
|
|
"In order to use OpenPipe, you need to `pip install pipecat-ai[openpipe]`. Also, set `OPENPIPE_API_KEY` and `OPENAI_API_KEY` environment variables.")
|
|
raise Exception(f"Missing module: {e}")
|
|
|
|
|
|
class OpenPipeLLMService(BaseOpenAILLMService):
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
model: str = "gpt-4o",
|
|
api_key: str | None = None,
|
|
base_url: str | None = None,
|
|
openpipe_api_key: str | None = None,
|
|
openpipe_base_url: str = "https://app.openpipe.ai/api/v1",
|
|
tags: Dict[str, str] | None = None,
|
|
**kwargs):
|
|
super().__init__(
|
|
model=model,
|
|
api_key=api_key,
|
|
base_url=base_url,
|
|
openpipe_api_key=openpipe_api_key,
|
|
openpipe_base_url=openpipe_base_url,
|
|
**kwargs)
|
|
self._tags = tags
|
|
|
|
def create_client(self, api_key=None, base_url=None, **kwargs):
|
|
openpipe_api_key = kwargs.get("openpipe_api_key") or ""
|
|
openpipe_base_url = kwargs.get("openpipe_base_url") or ""
|
|
client = OpenPipeAI(
|
|
api_key=api_key,
|
|
base_url=base_url,
|
|
openpipe={
|
|
"api_key": openpipe_api_key,
|
|
"base_url": openpipe_base_url
|
|
}
|
|
)
|
|
return client
|
|
|
|
async def get_chat_completions(
|
|
self,
|
|
context: OpenAILLMContext,
|
|
messages: List[ChatCompletionMessageParam]) -> AsyncStream[ChatCompletionChunk]:
|
|
chunks = await self._client.chat.completions.create(
|
|
model=self._model,
|
|
stream=True,
|
|
messages=messages,
|
|
openpipe={
|
|
"tags": self._tags,
|
|
"log_request": True
|
|
}
|
|
)
|
|
return chunks
|
|
|