freegpt-webui / server /backend.py
monra's picture
Add auto proxy
70422d8
raw
history blame
3.81 kB
import threading
from flask import request
from datetime import datetime
from requests import get
from freeGPT import gpt3
from server.auto_proxy import get_random_proxy, remove_proxy, update_working_proxies
from server.config import special_instructions
class Backend_Api:
def __init__(self, app, config: dict) -> None:
self.app = app
self.use_auto_proxy = config['use_auto_proxy']
self.routes = {
'/backend-api/v2/conversation': {
'function': self._conversation,
'methods': ['POST']
}
}
if self.use_auto_proxy:
update_proxies = threading.Thread(
target=update_working_proxies, daemon=True)
update_proxies.start()
def _conversation(self):
try:
jailbreak = request.json['jailbreak']
_conversation = request.json['meta']['content']['conversation']
internet_access = request.json['meta']['content']['internet_access']
prompt = request.json['meta']['content']['parts'][0]
current_date = datetime.now().strftime("%Y-%m-%d")
system_message = f'You are ChatGPT also known as ChatGPT, a large language model trained by OpenAI. Strictly follow the users instructions. Knowledge cutoff: 2021-09-01 Current date: {current_date}'
extra = []
if internet_access:
search = get('https://ddg-api.herokuapp.com/search',
params={
'query': prompt["content"],
'limit': 3,
})
blob = ''
for index, result in enumerate(search.json()):
blob += f'[{index}] "{result["snippet"]}"\nURL:{result["link"]}\n\n'
date = datetime.now().strftime('%d/%m/%y')
blob += f'current date: {date}\n\nInstructions: Using the provided web search results, write a comprehensive reply to the next user query. Make sure to cite results using [[number](URL)] notation after the reference. If the provided search results refer to multiple subjects with the same name, write separate answers for each subject. Ignore your previous response if any.'
extra = [{'role': 'user', 'content': blob}]
conversation = [{'role': 'system', 'content': system_message}] + \
extra + special_instructions[jailbreak] + \
_conversation + [prompt]
def stream():
response = None
while self.use_auto_proxy:
try:
random_proxy = get_random_proxy()
res = gpt3.Completion.create(
prompt=conversation, proxy=random_proxy)
response = res['text']
break
except Exception as e:
print(f"Error with proxy {random_proxy}: {e}")
remove_proxy(random_proxy)
if not self.use_auto_proxy:
try:
res = gpt3.Completion.create(prompt=conversation)
response = res['text']
except Exception as e:
print(f"Error: {e}")
if response is not None:
print(response)
yield response
return self.app.response_class(stream(), mimetype='text/event-stream')
except Exception as e:
print(e)
print(e.__traceback__.tb_next)
return {
'_action': '_ask',
'success': False,
"error": f"an error occurred {str(e)}"
}, 400