mfoud444's picture
first commit
79859e3
raw
history blame
3.02 kB
from __future__ import annotations
import json
from typing import AsyncGenerator, Optional, List, Dict, Union, Any
from aiohttp import ClientSession, BaseConnector, ClientResponse
from ...typing import AsyncResult, Messages
from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin
from ..helper import get_random_string, get_connector
from ...requests import raise_for_status
class Koala(AsyncGeneratorProvider, ProviderModelMixin):
url = "https://koala.sh/chat"
api_endpoint = "https://koala.sh/api/gpt/"
working = False
supports_message_history = True
default_model = 'gpt-4o-mini'
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
proxy: Optional[str] = None,
connector: Optional[BaseConnector] = None,
**kwargs: Any
) -> AsyncGenerator[Dict[str, Union[str, int, float, List[Dict[str, Any]], None]], None]:
if not model:
model = "gpt-4o-mini"
headers = {
"User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0",
"Accept": "text/event-stream",
"Accept-Language": "de,en-US;q=0.7,en;q=0.3",
"Accept-Encoding": "gzip, deflate, br",
"Referer": f"{cls.url}",
"Flag-Real-Time-Data": "false",
"Visitor-ID": get_random_string(20),
"Origin": "https://koala.sh",
"Alt-Used": "koala.sh",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"TE": "trailers",
}
async with ClientSession(headers=headers, connector=get_connector(connector, proxy)) as session:
input_text = messages[-1]["content"]
system_messages = " ".join(
message["content"] for message in messages if message["role"] == "system"
)
if system_messages:
input_text += f" {system_messages}"
data = {
"input": input_text,
"inputHistory": [
message["content"]
for message in messages[:-1]
if message["role"] == "user"
],
"outputHistory": [
message["content"]
for message in messages
if message["role"] == "assistant"
],
"model": model,
}
async with session.post(f"{cls.api_endpoint}", json=data, proxy=proxy) as response:
await raise_for_status(response)
async for chunk in cls._parse_event_stream(response):
yield chunk
@staticmethod
async def _parse_event_stream(response: ClientResponse) -> AsyncGenerator[Dict[str, Any], None]:
async for chunk in response.content:
if chunk.startswith(b"data: "):
yield json.loads(chunk[6:])