starsaround's picture
Upload 183 files
f4b4235
raw
history blame
7.49 kB
import hashlib
from base64 import b64encode
from json import loads
from re import findall
from time import time, sleep
from typing import Generator, Optional
from uuid import uuid4
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from fake_useragent import UserAgent
from mailgw_temporary_email import Email
from requests import post
from tls_client import Session
from .typing import ForeFrontResponse, AccountData
class Account:
@staticmethod
def create(proxy: Optional[str] = None, logging: bool = False) -> AccountData:
proxies = {'http': 'http://' + proxy, 'https': 'http://' + proxy} if proxy else False
start = time()
mail_client = Email()
mail_client.register()
mail_address = mail_client.address
client = Session(client_identifier='chrome110')
client.proxies = proxies
client.headers = {
'origin': 'https://accounts.forefront.ai',
'user-agent': UserAgent().random,
}
response = client.post(
'https://clerk.forefront.ai/v1/client/sign_ups?_clerk_js_version=4.38.4',
data={'email_address': mail_address},
)
try:
trace_token = response.json()['response']['id']
if logging:
print(trace_token)
except KeyError:
raise RuntimeError('Failed to create account!')
response = client.post(
f'https://clerk.forefront.ai/v1/client/sign_ups/{trace_token}/prepare_verification?_clerk_js_version=4.38.4',
data={
'strategy': 'email_link',
'redirect_url': 'https://accounts.forefront.ai/sign-up/verify'
},
)
if logging:
print(response.text)
if 'sign_up_attempt' not in response.text:
raise RuntimeError('Failed to create account!')
while True:
sleep(5)
message_id = mail_client.message_list()[0]['id']
message = mail_client.message(message_id)
verification_url = findall(r'https:\/\/clerk\.forefront\.ai\/v1\/verify\?token=\w.+', message["text"])[0]
if verification_url:
break
if logging:
print(verification_url)
client.get(verification_url)
response = client.get('https://clerk.forefront.ai/v1/client?_clerk_js_version=4.38.4').json()
session_data = response['response']['sessions'][0]
user_id = session_data['user']['id']
session_id = session_data['id']
token = session_data['last_active_token']['jwt']
with open('accounts.txt', 'a') as f:
f.write(f'{mail_address}:{token}\n')
if logging:
print(time() - start)
return AccountData(token=token, user_id=user_id, session_id=session_id)
class StreamingCompletion:
@staticmethod
def create(
prompt: str,
account_data: AccountData,
chat_id=None,
action_type='new',
default_persona='607e41fe-95be-497e-8e97-010a59b2e2c0', # default
model='gpt-4',
proxy=None
) -> Generator[ForeFrontResponse, None, None]:
token = account_data.token
if not chat_id:
chat_id = str(uuid4())
proxies = {'http': 'http://' + proxy, 'https': 'http://' + proxy} if proxy else None
base64_data = b64encode((account_data.user_id + default_persona + chat_id).encode()).decode()
encrypted_signature = StreamingCompletion.__encrypt(base64_data, account_data.session_id)
headers = {
'authority': 'chat-server.tenant-forefront-default.knative.chi.coreweave.com',
'accept': '*/*',
'accept-language': 'en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3',
'cache-control': 'no-cache',
'content-type': 'application/json',
'origin': 'https://chat.forefront.ai',
'pragma': 'no-cache',
'referer': 'https://chat.forefront.ai/',
'sec-ch-ua': '"Chromium";v="112", "Google Chrome";v="112", "Not:A-Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'authorization': f"Bearer {token}",
'X-Signature': encrypted_signature,
'user-agent': UserAgent().random,
}
json_data = {
'text': prompt,
'action': action_type,
'parentId': chat_id,
'workspaceId': chat_id,
'messagePersona': default_persona,
'model': model,
}
for chunk in post(
'https://streaming.tenant-forefront-default.knative.chi.coreweave.com/chat',
headers=headers,
proxies=proxies,
json=json_data,
stream=True,
).iter_lines():
if b'finish_reason":null' in chunk:
data = loads(chunk.decode('utf-8').split('data: ')[1])
token = data['choices'][0]['delta'].get('content')
if token is not None:
yield ForeFrontResponse(
**{
'id': chat_id,
'object': 'text_completion',
'created': int(time()),
'text': token,
'model': model,
'choices': [{'text': token, 'index': 0, 'logprobs': None, 'finish_reason': 'stop'}],
'usage': {
'prompt_tokens': len(prompt),
'completion_tokens': len(token),
'total_tokens': len(prompt) + len(token),
},
}
)
@staticmethod
def __encrypt(data: str, key: str) -> str:
hash_key = hashlib.sha256(key.encode()).digest()
iv = get_random_bytes(16)
cipher = AES.new(hash_key, AES.MODE_CBC, iv)
encrypted_data = cipher.encrypt(StreamingCompletion.__pad_data(data.encode()))
return iv.hex() + encrypted_data.hex()
@staticmethod
def __pad_data(data: bytes) -> bytes:
block_size = AES.block_size
padding_size = block_size - len(data) % block_size
padding = bytes([padding_size] * padding_size)
return data + padding
class Completion:
@staticmethod
def create(
prompt: str,
account_data: AccountData,
chat_id=None,
action_type='new',
default_persona='607e41fe-95be-497e-8e97-010a59b2e2c0', # default
model='gpt-4',
proxy=None
) -> ForeFrontResponse:
text = ''
final_response = None
for response in StreamingCompletion.create(
account_data=account_data,
chat_id=chat_id,
prompt=prompt,
action_type=action_type,
default_persona=default_persona,
model=model,
proxy=proxy
):
if response:
final_response = response
text += response.text
if final_response:
final_response.text = text
else:
raise RuntimeError('Unable to get the response, Please try again')
return final_response