Webscout / webscout /AIauto.py
Abhaykoul's picture
Upload 85 files
9e7090f verified
from webscout.AIbase import Provider, AsyncProvider
from webscout import OPENGPT, AsyncOPENGPT
from webscout import KOBOLDAI, AsyncKOBOLDAI
from webscout import PhindSearch, AsyncPhindSearch
from webscout import LLAMA2, AsyncLLAMA2
from webscout import BLACKBOXAI, AsyncBLACKBOXAI
from webscout import PERPLEXITY
from webscout import ThinkAnyAI
from webscout import YouChat
from webscout import YEPCHAT
from webscout.AIbase import Provider, AsyncProvider
from webscout import KOBOLDAI, AsyncKOBOLDAI
from webscout import PhindSearch, AsyncPhindSearch
from webscout import LLAMA2, AsyncLLAMA2
from webscout import BLACKBOXAI, AsyncBLACKBOXAI
from webscout import PERPLEXITY
from webscout import ThinkAnyAI
from webscout import YouChat
from webscout import YEPCHAT, AsyncYEPCHAT
from webscout import LEO, AsyncLEO
from webscout import GROQ, AsyncGROQ
from webscout import OPENAI, AsyncOPENAI
from webscout import REKA
from webscout import Xjai
from webscout import Berlin4h
from webscout import ChatGPTUK
from webscout.g4f import GPT4FREE, AsyncGPT4FREE
from webscout.g4f import TestProviders
from webscout.exceptions import AllProvidersFailure
from webscout.async_providers import mapper as async_provider_map
from typing import AsyncGenerator
from typing import Union
from typing import Any
import logging
provider_map: dict[
str, Union[ ThinkAnyAI,
Xjai,
LLAMA2,
AsyncLLAMA2,
LEO,
AsyncLEO,
KOBOLDAI,
AsyncKOBOLDAI,
OPENGPT,
AsyncOPENGPT,
PERPLEXITY,
BLACKBOXAI,
AsyncBLACKBOXAI,
PhindSearch,
AsyncPhindSearch,
YEPCHAT,
AsyncYEPCHAT,
YouChat,
Berlin4h,
ChatGPTUK,]
] = {
"PhindSearch": PhindSearch,
"perplexity": PERPLEXITY,
"opengpt": OPENGPT,
"koboldai": KOBOLDAI,
"llama2": LLAMA2,
"blackboxai": BLACKBOXAI,
"gpt4free": GPT4FREE,
"thinkany": ThinkAnyAI,
"yepchat": YEPCHAT,
"you": YouChat,
"leo": LEO,
"xjai": Xjai,
"berlin4h": Berlin4h,
"chatgptuk": ChatGPTUK,
"gpt4free": GPT4FREE,
}
class AUTO(Provider):
def __init__(
self,
is_conversation: bool = True,
max_tokens: int = 600,
timeout: int = 30,
intro: str = None,
filepath: str = None,
update_file: bool = True,
proxies: dict = {},
history_offset: int = 10250,
act: str = None,
exclude: list[str] = [],
):
"""Instantiates AUTO
Args:
is_conversation (bool, optional): Flag for chatting conversationally. Defaults to True
max_tokens (int, optional): Maximum number of tokens to be generated upon completion. Defaults to 600.
timeout (int, optional): Http request timeout. Defaults to 30.
intro (str, optional): Conversation introductory prompt. Defaults to None.
filepath (str, optional): Path to file containing conversation history. Defaults to None.
update_file (bool, optional): Add new prompts and responses to the file. Defaults to True.
proxies (dict, optional): Http request proxies. Defaults to {}.
history_offset (int, optional): Limit conversation history to this number of last texts. Defaults to 10250.
act (str|int, optional): Awesome prompt key or index. (Used as intro). Defaults to None.
exclude(list[str], optional): List of providers to be excluded. Defaults to [].
"""
self.provider: Union[OPENGPT, KOBOLDAI, PhindSearch, LLAMA2, BLACKBOXAI, PERPLEXITY, GPT4FREE, ThinkAnyAI, YEPCHAT, YouChat] = None
self.provider_name: str = None
self.is_conversation = is_conversation
self.max_tokens = max_tokens
self.timeout = timeout
self.intro = intro
self.filepath = filepath
self.update_file = update_file
self.proxies = proxies
self.history_offset = history_offset
self.act = act
self.exclude = exclude
@property
def last_response(self) -> dict[str, Any]:
return self.provider.last_response
@property
def conversation(self) -> object:
return self.provider.conversation
def ask(
self,
prompt: str,
stream: bool = False,
raw: bool = False,
optimizer: str = None,
conversationally: bool = False,
run_new_test: bool = False,
) -> dict:
"""Chat with AI
Args:
prompt (str): Prompt to be send.
stream (bool, optional): Flag for streaming response. Defaults to False.
raw (bool, optional): Stream back raw response as received. Defaults to False.
optimizer (str, optional): Prompt optimizer name - `[code, shell_command]`. Defaults to None.
conversationally (bool, optional): Chat conversationally when using optimizer. Defaults to False.
run_new_test (bool, optional): Perform new test on g4f-based providers. Defaults to False.
Returns:
dict : {}
"""
ask_kwargs: dict[str, Union[str, bool]] = {
"prompt": prompt,
"stream": stream,
"raw": raw,
"optimizer": optimizer,
"conversationally": conversationally,
}
# webscout-based providers
for provider_name, provider_obj in provider_map.items():
# continue
if provider_name in self.exclude:
continue
try:
self.provider_name = f"webscout-{provider_name}"
self.provider = provider_obj(
is_conversation=self.is_conversation,
max_tokens=self.max_tokens,
timeout=self.timeout,
intro=self.intro,
filepath=self.filepath,
update_file=self.update_file,
proxies=self.proxies,
history_offset=self.history_offset,
act=self.act,
)
def for_stream():
for chunk in self.provider.ask(**ask_kwargs):
yield chunk
def for_non_stream():
return self.provider.ask(**ask_kwargs)
return for_stream() if stream else for_non_stream()
except Exception as e:
logging.debug(
f"Failed to generate response using provider {provider_name} - {e}"
)
# g4f-based providers
for provider_info in TestProviders(timeout=self.timeout).get_results(
run=run_new_test
):
if provider_info["name"] in self.exclude:
continue
try:
self.provider_name = f"g4f-{provider_info['name']}"
self.provider = GPT4FREE(
provider=provider_info["name"],
is_conversation=self.is_conversation,
max_tokens=self.max_tokens,
intro=self.intro,
filepath=self.filepath,
update_file=self.update_file,
proxies=self.proxies,
history_offset=self.history_offset,
act=self.act,
)
def for_stream():
for chunk in self.provider.ask(**ask_kwargs):
yield chunk
def for_non_stream():
return self.provider.ask(**ask_kwargs)
return for_stream() if stream else for_non_stream()
except Exception as e:
logging.debug(
f"Failed to generate response using GPT4FREE-base provider {provider_name} - {e}"
)
raise AllProvidersFailure(
"None of the providers generated response successfully."
)
def chat(
self,
prompt: str,
stream: bool = False,
optimizer: str = None,
conversationally: bool = False,
run_new_test: bool = False,
) -> str:
"""Generate response `str`
Args:
prompt (str): Prompt to be send.
stream (bool, optional): Flag for streaming response. Defaults to False.
optimizer (str, optional): Prompt optimizer name - `[code, shell_command]`. Defaults to None.
conversationally (bool, optional): Chat conversationally when using optimizer. Defaults to False.
run_new_test (bool, optional): Perform new test on g4f-based providers. Defaults to False.
Returns:
str: Response generated
"""
def for_stream():
for response in self.ask(
prompt,
True,
optimizer=optimizer,
conversationally=conversationally,
run_new_test=run_new_test,
):
yield self.get_message(response)
def for_non_stream():
ask_response = self.ask(
prompt,
False,
optimizer=optimizer,
conversationally=conversationally,
run_new_test=run_new_test,
)
return self.get_message(ask_response)
return for_stream() if stream else for_non_stream()
def get_message(self, response: dict) -> str:
"""Retrieves message only from response
Args:
response (dict): Response generated by `self.ask`
Returns:
str: Message extracted
"""
assert self.provider is not None, "Chat with AI first"
return self.provider.get_message(response)
class AsyncAUTO(AsyncProvider):
def __init__(
self,
is_conversation: bool = True,
max_tokens: int = 600,
timeout: int = 30,
intro: str = None,
filepath: str = None,
update_file: bool = True,
proxies: dict = {},
history_offset: int = 10250,
act: str = None,
exclude: list[str] = [],
):
"""Instantiates AsyncAUTO
Args:
is_conversation (bool, optional): Flag for chatting conversationally. Defaults to True
max_tokens (int, optional): Maximum number of tokens to be generated upon completion. Defaults to 600.
timeout (int, optional): Http request timeout. Defaults to 30.
intro (str, optional): Conversation introductory prompt. Defaults to None.
filepath (str, optional): Path to file containing conversation history. Defaults to None.
update_file (bool, optional): Add new prompts and responses to the file. Defaults to True.
proxies (dict, optional): Http request proxies. Defaults to {}.
history_offset (int, optional): Limit conversation history to this number of last texts. Defaults to 10250.
act (str|int, optional): Awesome prompt key or index. (Used as intro). Defaults to None.
exclude(list[str], optional): List of providers to be excluded. Defaults to [].
"""
self.provider: Union[
AsyncOPENGPT,
AsyncKOBOLDAI,
AsyncPhindSearch,
AsyncLLAMA2,
AsyncBLACKBOXAI,
AsyncGPT4FREE,
] = None
self.provider_name: str = None
self.is_conversation = is_conversation
self.max_tokens = max_tokens
self.timeout = timeout
self.intro = intro
self.filepath = filepath
self.update_file = update_file
self.proxies = proxies
self.history_offset = history_offset
self.act = act
self.exclude = exclude
@property
def last_response(self) -> dict[str, Any]:
return self.provider.last_response
@property
def conversation(self) -> object:
return self.provider.conversation
async def ask(
self,
prompt: str,
stream: bool = False,
raw: bool = False,
optimizer: str = None,
conversationally: bool = False,
run_new_test: bool = False,
) -> dict | AsyncGenerator:
"""Chat with AI asynchronously.
Args:
prompt (str): Prompt to be send.
stream (bool, optional): Flag for streaming response. Defaults to False.
raw (bool, optional): Stream back raw response as received. Defaults to False.
optimizer (str, optional): Prompt optimizer name - `[code, shell_command]`. Defaults to None.
conversationally (bool, optional): Chat conversationally when using optimizer. Defaults to False.
run_new_test (bool, optional): Perform new test on g4f-based providers. Defaults to False.
Returns:
dict|AsyncGenerator : ai response.
"""
ask_kwargs: dict[str, Union[str, bool]] = {
"prompt": prompt,
"stream": stream,
"raw": raw,
"optimizer": optimizer,
"conversationally": conversationally,
}
# tgpt-based providers
for provider_name, provider_obj in async_provider_map.items():
if provider_name in self.exclude:
continue
try:
self.provider_name = f"tgpt-{provider_name}"
self.provider = provider_obj(
is_conversation=self.is_conversation,
max_tokens=self.max_tokens,
timeout=self.timeout,
intro=self.intro,
filepath=self.filepath,
update_file=self.update_file,
proxies=self.proxies,
history_offset=self.history_offset,
act=self.act,
)
async def for_stream():
async_ask = await self.provider.ask(**ask_kwargs)
async for chunk in async_ask:
yield chunk
async def for_non_stream():
return await self.provider.ask(**ask_kwargs)
return for_stream() if stream else await for_non_stream()
except Exception as e:
logging.debug(
f"Failed to generate response using provider {provider_name} - {e}"
)
# g4f-based providers
for provider_info in TestProviders(timeout=self.timeout).get_results(
run=run_new_test
):
if provider_info["name"] in self.exclude:
continue
try:
self.provider_name = f"g4f-{provider_info['name']}"
self.provider = AsyncGPT4FREE(
provider=provider_info["name"],
is_conversation=self.is_conversation,
max_tokens=self.max_tokens,
intro=self.intro,
filepath=self.filepath,
update_file=self.update_file,
proxies=self.proxies,
history_offset=self.history_offset,
act=self.act,
)
async def for_stream():
async_ask = await self.provider.ask(**ask_kwargs)
async for chunk in async_ask:
yield chunk
async def for_non_stream():
return await self.provider.ask(**ask_kwargs)
return for_stream() if stream else await for_non_stream()
except Exception as e:
logging.debug(
f"Failed to generate response using GPT4FREE-base provider {provider_name} - {e}"
)
raise AllProvidersFailure(
"None of the providers generated response successfully."
)
async def chat(
self,
prompt: str,
stream: bool = False,
optimizer: str = None,
conversationally: bool = False,
run_new_test: bool = False,
) -> str | AsyncGenerator:
"""Generate response `str` asynchronously.
Args:
prompt (str): Prompt to be send.
stream (bool, optional): Flag for streaming response. Defaults to False.
optimizer (str, optional): Prompt optimizer name - `[code, shell_command]`. Defaults to None.
conversationally (bool, optional): Chat conversationally when using optimizer. Defaults to False.
run_new_test (bool, optional): Perform new test on g4f-based providers. Defaults to False.
Returns:
str|AsyncGenerator: Response generated
"""
async def for_stream():
async_ask = await self.ask(
prompt,
True,
optimizer=optimizer,
conversationally=conversationally,
run_new_test=run_new_test,
)
async for response in async_ask:
yield await self.get_message(response)
async def for_non_stream():
ask_response = await self.ask(
prompt,
False,
optimizer=optimizer,
conversationally=conversationally,
run_new_test=run_new_test,
)
return await self.get_message(ask_response)
return for_stream() if stream else await for_non_stream()
async def get_message(self, response: dict) -> str:
"""Retrieves message only from response
Args:
response (dict): Response generated by `self.ask`
Returns:
str: Message extracted
"""
assert self.provider is not None, "Chat with AI first"
return await self.provider.get_message(response)