fastapi / Linlada.py
ka1kuk's picture
Update Linlada.py
2c3649f
import os
import json
import random
import json
import os
import uuid
import ssl
import certifi
import aiohttp
import asyncio
import requests
from typing import Dict, NewType, Union, Optional, List, get_type_hints
sha256 = NewType('sha_256_hash', str)
url = 'https://bing.com/chat'
model = ['gpt-4']
supports_stream = True
needs_auth = False
ssl_context = ssl.create_default_context()
ssl_context.load_verify_locations(certifi.where())
class optionsSets:
optionSet: dict = {
'tone': str,
'optionsSets': list
}
jailbreak: dict = {
"optionsSets": [
'saharasugg',
'enablenewsfc',
'clgalileo',
'gencontentv3',
"nlu_direct_response_filter",
"deepleo",
"disable_emoji_spoken_text",
"responsible_ai_policy_235",
"enablemm",
"h3precise"
# "harmonyv3",
"dtappid",
"cricinfo",
"cricinfov2",
"dv3sugg",
"nojbfedge"
]
}
class Defaults:
delimiter = '\x1e'
ip_address = f'13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}'
allowedMessageTypes = [
'Chat',
'Disengaged',
'AdsQuery',
'SemanticSerp',
'GenerateContentQuery',
'SearchQuery',
'ActionRequest',
'Context',
'Progress',
'AdsQuery',
'SemanticSerp'
]
sliceIds = [
# "222dtappid",
# "225cricinfo",
# "224locals0"
'winmuid3tf',
'osbsdusgreccf',
'ttstmout',
'crchatrev',
'winlongmsgtf',
'ctrlworkpay',
'norespwtf',
'tempcacheread',
'temptacache',
'505scss0',
'508jbcars0',
'515enbotdets0',
'5082tsports',
'515vaoprvs',
'424dagslnv1s0',
'kcimgattcf',
'427startpms0'
]
location = {
'locale': 'en-US',
'market': 'en-US',
'region': 'US',
'locationHints': [
{
'country': 'United States',
'state': 'California',
'city': 'Los Angeles',
'timezoneoffset': 8,
'countryConfidence': 8,
'Center': {
'Latitude': 34.0536909,
'Longitude': -118.242766
},
'RegionType': 2,
'SourceType': 1
}
],
}
def _format(msg: dict) -> str:
return json.dumps(msg, ensure_ascii=False) + Defaults.delimiter
async def create_conversation():
for _ in range(5):
create = requests.get('https://www.bing.com/turing/conversation/create',
headers={
'authority': 'edgeservices.bing.com',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'en-US,en;q=0.9',
'cache-control': 'max-age=0',
'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"',
'sec-ch-ua-arch': '"x86"',
'sec-ch-ua-bitness': '"64"',
'sec-ch-ua-full-version': '"110.0.1587.69"',
'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-model': '""',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua-platform-version': '"15.0.0"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69',
'x-edge-shopping-flag': '1',
'x-forwarded-for': Defaults.ip_address
})
conversationId = create.json().get('conversationId')
clientId = create.json().get('clientId')
conversationSignature = create.json().get('conversationSignature')
if not conversationId or not clientId or not conversationSignature and _ == 4:
raise Exception('Failed to create conversation.')
return conversationId, clientId, conversationSignature
async def stream_generate(prompt: str, mode: optionsSets.optionSet = optionsSets.jailbreak, context: bool or str = False):
timeout = aiohttp.ClientTimeout(total=900)
session = aiohttp.ClientSession(timeout=timeout)
conversationId, clientId, conversationSignature = await create_conversation()
wss = await session.ws_connect('wss://sydney.bing.com/sydney/ChatHub', ssl=ssl_context, autoping=False,
headers={
'accept': 'application/json',
'accept-language': 'en-US,en;q=0.9',
'content-type': 'application/json',
'sec-ch-ua': '"Not_A Brand";v="99", "Microsoft Edge";v="110", "Chromium";v="110"',
'sec-ch-ua-arch': '"x86"',
'sec-ch-ua-bitness': '"64"',
'sec-ch-ua-full-version': '"109.0.1518.78"',
'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-model': '',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua-platform-version': '"15.0.0"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'x-ms-client-request-id': str(uuid.uuid4()),
'x-ms-useragent': 'azsdk-js-api-client-factory/1.0.0-beta.1 core-rest-pipeline/1.10.0 OS/Win32',
'Referer': 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx',
'Referrer-Policy': 'origin-when-cross-origin',
'x-forwarded-for': Defaults.ip_address
})
await wss.send_str(_format({'protocol': 'json', 'version': 1}))
await wss.receive(timeout=900)
struct = {
'arguments': [
{
**mode,
'source': 'cib',
'allowedMessageTypes': Defaults.allowedMessageTypes,
'sliceIds': Defaults.sliceIds,
'traceId': os.urandom(16).hex(),
'isStartOfSession': True,
'message': Defaults.location | {
'author': 'user',
'inputMethod': 'Keyboard',
'text': prompt,
'messageType': 'Chat'
},
'conversationSignature': conversationSignature,
'participant': {
'id': clientId
},
'conversationId': conversationId
}
],
'invocationId': '0',
'target': 'chat',
'type': 4
}
if context:
struct['arguments'][0]['previousMessages'] = [
{
"author": "user",
"description": context,
"contextType": "WebPage",
"messageType": "Context",
"messageId": "discover-web--page-ping-mriduna-----"
}
]
await wss.send_str(_format(struct))
final = False
draw = False
resp_txt = ''
result_text = ''
resp_txt_no_link = ''
cache_text = ''
while not final:
msg = await wss.receive(timeout=900)
objects = msg.data.split(Defaults.delimiter)
for obj in objects:
if obj is None or not obj:
continue
response = json.loads(obj)
if response.get('type') == 1 and response['arguments'][0].get('messages',):
if not draw:
if (response['arguments'][0]['messages'][0]['contentOrigin'] != 'Apology') and not draw:
resp_txt = result_text + \
response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0].get(
'text', '')
resp_txt_no_link = result_text + \
response['arguments'][0]['messages'][0].get(
'text', '')
if response['arguments'][0]['messages'][0].get('messageType',):
resp_txt = (
resp_txt
+ response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text')
+ '\n'
)
result_text = (
result_text
+ response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text')
+ '\n'
)
if cache_text.endswith(' '):
final = True
if wss and not wss.closed:
await wss.close()
if session and not session.closed:
await session.close()
yield (resp_txt.replace(cache_text, ''))
cache_text = resp_txt
elif response.get('type') == 2:
if response['item']['result'].get('error'):
if wss and not wss.closed:
await wss.close()
if session and not session.closed:
await session.close()
raise Exception(
f"{response['item']['result']['value']}: {response['item']['result']['message']}")
if draw:
cache = response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text']
response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text'] = (
cache + resp_txt)
if (response['item']['messages'][-1]['contentOrigin'] == 'Apology' and resp_txt):
response['item']['messages'][-1]['text'] = resp_txt_no_link
response['item']['messages'][-1]['adaptiveCards'][0]['body'][0]['text'] = resp_txt
# print('Preserved the message from being deleted', file=sys.stderr)
final = True
if wss and not wss.closed:
await wss.close()
if session and not session.closed:
await session.close()
def run(generator):
loop = asyncio.get_event_loop()
gen = generator.__aiter__()
while True:
try:
next_val = loop.run_until_complete(gen.__anext__())
yield next_val
except StopAsyncIteration:
break
#print('Done')
def convert(messages):
context = ""
for message in messages:
context += "[%s](#message)\n%s\n\n" % (message['role'],
message['content'])
return context
def _create_completion(model: str, messages: list, stream: bool, **kwargs):
if len(messages) < 2:
prompt = messages[0]['content']
context = False
else:
prompt = messages[-1]['content']
context = convert(messages[:-1])
response = run(stream_generate(prompt, optionsSets.jailbreak, context))
for token in response:
yield (token)
#print('Done')
params = f'g4f.Providers.{os.path.basename(__file__)[:-3]} supports: ' + \
'(%s)' % ', '.join(
[f"{name}: {get_type_hints(_create_completion)[name].__name__}" for name in _create_completion.__code__.co_varnames[:_create_completion.__code__.co_argcount]])