|
from __future__ import annotations |
|
|
|
import json |
|
from typing import AsyncGenerator, Optional, List, Dict, Union, Any |
|
from aiohttp import ClientSession, BaseConnector, ClientResponse |
|
|
|
from ...typing import AsyncResult, Messages |
|
from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin |
|
from ..helper import get_random_string, get_connector |
|
from ...requests import raise_for_status |
|
|
|
class Koala(AsyncGeneratorProvider, ProviderModelMixin): |
|
url = "https://koala.sh/chat" |
|
api_endpoint = "https://koala.sh/api/gpt/" |
|
working = False |
|
supports_message_history = True |
|
default_model = 'gpt-4o-mini' |
|
|
|
@classmethod |
|
async def create_async_generator( |
|
cls, |
|
model: str, |
|
messages: Messages, |
|
proxy: Optional[str] = None, |
|
connector: Optional[BaseConnector] = None, |
|
**kwargs: Any |
|
) -> AsyncGenerator[Dict[str, Union[str, int, float, List[Dict[str, Any]], None]], None]: |
|
if not model: |
|
model = "gpt-4o-mini" |
|
|
|
headers = { |
|
"User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0", |
|
"Accept": "text/event-stream", |
|
"Accept-Language": "de,en-US;q=0.7,en;q=0.3", |
|
"Accept-Encoding": "gzip, deflate, br", |
|
"Referer": f"{cls.url}", |
|
"Flag-Real-Time-Data": "false", |
|
"Visitor-ID": get_random_string(20), |
|
"Origin": "https://koala.sh", |
|
"Alt-Used": "koala.sh", |
|
"Sec-Fetch-Dest": "empty", |
|
"Sec-Fetch-Mode": "cors", |
|
"Sec-Fetch-Site": "same-origin", |
|
"TE": "trailers", |
|
} |
|
|
|
async with ClientSession(headers=headers, connector=get_connector(connector, proxy)) as session: |
|
input_text = messages[-1]["content"] |
|
system_messages = " ".join( |
|
message["content"] for message in messages if message["role"] == "system" |
|
) |
|
if system_messages: |
|
input_text += f" {system_messages}" |
|
|
|
data = { |
|
"input": input_text, |
|
"inputHistory": [ |
|
message["content"] |
|
for message in messages[:-1] |
|
if message["role"] == "user" |
|
], |
|
"outputHistory": [ |
|
message["content"] |
|
for message in messages |
|
if message["role"] == "assistant" |
|
], |
|
"model": model, |
|
} |
|
|
|
async with session.post(f"{cls.api_endpoint}", json=data, proxy=proxy) as response: |
|
await raise_for_status(response) |
|
async for chunk in cls._parse_event_stream(response): |
|
yield chunk |
|
|
|
@staticmethod |
|
async def _parse_event_stream(response: ClientResponse) -> AsyncGenerator[Dict[str, Any], None]: |
|
async for chunk in response.content: |
|
if chunk.startswith(b"data: "): |
|
yield json.loads(chunk[6:]) |
|
|