sanbo commited on
Commit
a487faf
·
1 Parent(s): 9887283

update sth. at 2025-01-04 23:21:29

Browse files
Files changed (5) hide show
  1. .gitignore +76 -0
  2. debug.md +1 -1
  3. m1.py +20 -20
  4. more_core.py +112 -154
  5. more_core_bksss.py +208 -0
.gitignore ADDED
@@ -0,0 +1,76 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ .externalNativeBuild
2
+ import-summary.txt
3
+
4
+ #java files
5
+ *.class
6
+ *.dex
7
+ .sync/
8
+
9
+ #for idea temp file
10
+ *.iws
11
+ *.ipr
12
+ *.iml
13
+ target/
14
+ .idea/
15
+ .gradle/
16
+ release/
17
+ build/
18
+ spoon/
19
+ releasebak/
20
+
21
+ #mac temp file
22
+ __MACOSX
23
+ .DS_Store
24
+ ._.DS_Store
25
+
26
+ #for eclipse
27
+ .settings/
28
+ local.properties
29
+ *gen/
30
+ *.classpath
31
+ */bin/
32
+ bin/
33
+ .project
34
+
35
+ #temp file
36
+ *.bak
37
+
38
+ *.pmd
39
+ sh.exe.stackdump
40
+
41
+ .vs/
42
+ .vscode/
43
+
44
+ *.log
45
+ *.ctxt
46
+ .mtj.tmp/
47
+
48
+ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
49
+ hs_err_pid*
50
+
51
+ # Package Files #
52
+ # *.jar
53
+ *.war
54
+ *.nar
55
+ *.ear
56
+ *.zip
57
+ *.tar.gz
58
+ *.rar
59
+ *.cxx
60
+ *.cfg
61
+ # for nodejs
62
+ node_modules/
63
+ # for python
64
+ package-lock.json
65
+ .$*
66
+ *.drawio.bkp
67
+ __pycache__
68
+ *.pyc
69
+ *.pyo
70
+ *.pyd
71
+ .Python
72
+ env
73
+ .env
74
+ .venv
75
+ pip-log.txt
76
+
debug.md CHANGED
@@ -72,7 +72,7 @@ curl -X POST http://localhost:7860/v1/chat/completions \
72
  "presence_penalty": 0.0
73
  }'
74
 
75
- curl -X POST http://localhost:7860/chat \
76
  -H 'Accept: application/json' \
77
  -H 'Authorization: Bearer YOUR_API_KEY' \
78
  -H "Content-Type: application/json" \
 
72
  "presence_penalty": 0.0
73
  }'
74
 
75
+ curl -X POST https://sanbo1200-planrun.hf.space/chat \
76
  -H 'Accept: application/json' \
77
  -H 'Authorization: Bearer YOUR_API_KEY' \
78
  -H "Content-Type: application/json" \
m1.py CHANGED
@@ -1,24 +1,13 @@
1
  import os
2
  import uvicorn
3
  import time
4
- import logging
5
  from fastapi import FastAPI, Request, HTTPException
6
  import tiktoken
7
  from json.decoder import JSONDecodeError
8
 
9
  # 初始化 FastAPI
10
  app = FastAPI()
11
- # 配置日志
12
- logging.basicConfig(level=logging.INFO)
13
- logger = logging.getLogger(__name__)
14
- logging.basicConfig(
15
- level=logging.INFO,
16
- format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
17
- handlers=[
18
- logging.StreamHandler(),
19
- logging.FileHandler('api_server.log', encoding='utf-8')
20
- ]
21
- )
22
  # 获取编码器
23
  encoding = tiktoken.get_encoding("cl100k_base")
24
 
@@ -35,10 +24,8 @@ def general_id(zimu=4, num=6):
35
  # 异常处理函数
36
  def handle_exception(e):
37
  if isinstance(e, JSONDecodeError):
38
- logger.error("JSON解析错误: %s", str(e))
39
  raise HTTPException(status_code=400, detail="无效的JSON格式")
40
  else:
41
- logger.error("服务器错误: %s", str(e))
42
  raise HTTPException(status_code=500, detail="服务器内部错误")
43
 
44
 
@@ -52,6 +39,20 @@ def is_chatgpt_format(data):
52
  pass
53
  return False
54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
 
56
  # 模拟 ChatGPT 响应
57
  def generate_response(headers, data):
@@ -67,7 +68,7 @@ def generate_response(headers, data):
67
  if is_chatgpt_format(result):
68
  response_data = result
69
  else:
70
- # 计算时间戳,13位毫秒时间戳
71
  current_timestamp = int(time.time() * 1000)
72
 
73
  # 计算 token 数量
@@ -104,7 +105,7 @@ def generate_response(headers, data):
104
  ]
105
  }
106
  # 打印最终返回的数据
107
- logger.info("Response Data: %s", response_data)
108
  return response_data
109
  except Exception as e:
110
  handle_exception(e) # 异常处理函数
@@ -166,24 +167,23 @@ def get_dynamic_routes():
166
 
167
  # 注册单个动态路由
168
  def register_route(path: str):
169
- # logger.info("register route path: %s", path)
170
  @app.post(path)
171
  async def dynamic_chat_endpoint(request: Request):
172
  try:
173
  headers = request.headers
174
  data = request.json()
175
- logger.info("Received Request Header: %s\nData: %s", headers, data)
176
  result = generate_response(headers, data)
177
  return result
178
  except Exception as e:
179
- logger.error("Error: %s", str(e))
180
  raise HTTPException(status_code=500, detail=str(e))
181
 
182
 
183
  # 动态设置路由
184
  def setup_dynamic_routes():
185
  routes = get_dynamic_routes()
186
- logger.info("Registering routes: %s", routes)
187
  for path in routes:
188
  register_route(path)
189
 
 
1
  import os
2
  import uvicorn
3
  import time
 
4
  from fastapi import FastAPI, Request, HTTPException
5
  import tiktoken
6
  from json.decoder import JSONDecodeError
7
 
8
  # 初始化 FastAPI
9
  app = FastAPI()
10
+
 
 
 
 
 
 
 
 
 
 
11
  # 获取编码器
12
  encoding = tiktoken.get_encoding("cl100k_base")
13
 
 
24
  # 异常处理函数
25
  def handle_exception(e):
26
  if isinstance(e, JSONDecodeError):
 
27
  raise HTTPException(status_code=400, detail="无效的JSON格式")
28
  else:
 
29
  raise HTTPException(status_code=500, detail="服务器内部错误")
30
 
31
 
 
39
  pass
40
  return False
41
 
42
+ def get_workers_count() -> int:
43
+ """
44
+ Calculate optimal number of workers
45
+ Default: 4, Maximum: 8
46
+ Formula: min(max(4, (2 * CPU cores) + 1), 8)
47
+ """
48
+ try:
49
+ cpu_cores = multiprocessing.cpu_count()
50
+ recommended_workers = (2 * cpu_cores) + 1
51
+ return min(max(4, recommended_workers), 8)
52
+ except Exception as e:
53
+ if debug:
54
+ print(f"Worker count calculation failed: {e}, using default 4")
55
+ return 4
56
 
57
  # 模拟 ChatGPT 响应
58
  def generate_response(headers, data):
 
68
  if is_chatgpt_format(result):
69
  response_data = result
70
  else:
71
+ # 计算时间戳
72
  current_timestamp = int(time.time() * 1000)
73
 
74
  # 计算 token 数量
 
105
  ]
106
  }
107
  # 打印最终返回的数据
108
+ print(f"Response Data: {response_data}" )
109
  return response_data
110
  except Exception as e:
111
  handle_exception(e) # 异常处理函数
 
167
 
168
  # 注册单个动态路由
169
  def register_route(path: str):
170
+ print(f"register route path: {path}")
171
  @app.post(path)
172
  async def dynamic_chat_endpoint(request: Request):
173
  try:
174
  headers = request.headers
175
  data = request.json()
176
+ print(f"Received Request Header: {headers}\nData: {data}")
177
  result = generate_response(headers, data)
178
  return result
179
  except Exception as e:
 
180
  raise HTTPException(status_code=500, detail=str(e))
181
 
182
 
183
  # 动态设置路由
184
  def setup_dynamic_routes():
185
  routes = get_dynamic_routes()
186
+ print("Registering routes: {routes}")
187
  for path in routes:
188
  register_route(path)
189
 
more_core.py CHANGED
@@ -1,147 +1,42 @@
1
  import os
2
- import asyncio
3
  import time
4
  import multiprocessing
5
- from typing import Dict, Any, List, Optional
6
  from fastapi import FastAPI, Request, HTTPException
7
  import uvicorn
8
  import tiktoken
9
  from json.decoder import JSONDecodeError
 
 
10
 
11
- # Initialize FastAPI application
12
  app = FastAPI(
13
- title="ChatGPT API Simulation",
14
- description="High-performance ChatGPT API simulation service",
15
- version="1.0.0"
16
  )
17
 
18
- debug = True
19
-
20
-
21
- class ServerConfig:
22
- """Server configuration handler with dynamic worker calculation"""
23
-
24
- @staticmethod
25
- def get_workers_count() -> int:
26
- """
27
- Calculate optimal number of workers
28
- Default: 4, Maximum: 8
29
- Formula: min(max(4, (2 * CPU cores) + 1), 8)
30
- """
31
- try:
32
- cpu_cores = multiprocessing.cpu_count()
33
- recommended_workers = (2 * cpu_cores) + 1
34
- return min(max(4, recommended_workers), 8)
35
- except Exception as e:
36
- if debug:
37
- print(f"Worker count calculation failed: {e}, using default 4")
38
- return 4
39
-
40
- @classmethod
41
- def get_uvicorn_config(cls,
42
- app: FastAPI,
43
- host: str = "0.0.0.0",
44
- port: int = 7860) -> uvicorn.Config:
45
- """Get optimized Uvicorn configuration"""
46
- workers = cls.get_workers_count()
47
- if debug:
48
- print(f"Configuring server with {workers} workers")
49
-
50
- return uvicorn.Config(
51
- app=app,
52
- host=host,
53
- port=port,
54
- workers=workers,
55
- loop="uvloop",
56
- limit_concurrency=1000,
57
- timeout_keep_alive=30,
58
- access_log=True,
59
- log_level="info",
60
- http="httptools"
61
- )
62
-
63
-
64
- class TokenProcessor:
65
- """Token processing handler"""
66
-
67
- def __init__(self):
68
- self.encoding = tiktoken.get_encoding("cl100k_base")
69
-
70
- async def calculate_tokens(self, text: str) -> int:
71
- """Calculate tokens for given text"""
72
- return len(self.encoding.encode(text))
73
-
74
-
75
- class ChatGPTSimulator:
76
- """ChatGPT response simulator"""
77
-
78
- def __init__(self):
79
- self.token_processor = TokenProcessor()
80
-
81
- @staticmethod
82
- def generate_id(letters: int = 4, numbers: int = 6) -> str:
83
- """Generate random chat completion ID"""
84
- import random
85
- import string
86
- letters_str = ''.join(random.choices(string.ascii_lowercase, k=letters))
87
- numbers_str = ''.join(random.choices(string.digits, k=numbers))
88
- return f"chatcmpl-{letters_str}{numbers_str}"
89
-
90
- async def generate_response(self, headers: Dict[str, str], data: Dict[str, Any]) -> Dict[str, Any]:
91
- """Generate simulated ChatGPT response"""
92
- try:
93
- result = "This is a test result."
94
-
95
- prompt_tokens = await self.token_processor.calculate_tokens(str(data))
96
- completion_tokens = await self.token_processor.calculate_tokens(result)
97
- total_tokens = prompt_tokens + completion_tokens
98
-
99
- response_data = {
100
- "id": self.generate_id(),
101
- "object": "chat.completion",
102
- "created": int(time.time()),
103
- "model": data.get("model", "gpt-3.5-turbo"),
104
- "usage": {
105
- "prompt_tokens": prompt_tokens,
106
- "completion_tokens": completion_tokens,
107
- "total_tokens": total_tokens
108
- },
109
- "choices": [
110
- {
111
- "message": {
112
- "role": "assistant",
113
- "content": result
114
- },
115
- "finish_reason": "stop",
116
- "index": 0
117
- }
118
- ]
119
- }
120
- return response_data
121
-
122
- except Exception as e:
123
- if debug:
124
- print(f"Response generation error: {e}")
125
- raise HTTPException(status_code=500, detail=str(e))
126
-
127
-
128
- class RouteManager:
129
- """Route management handler"""
130
 
 
 
 
131
  def __init__(self, app: FastAPI):
132
  self.app = app
133
- self.simulator = ChatGPTSimulator()
134
-
135
- def setup_routes(self):
136
- """Setup API routes"""
137
- routes = self._get_dynamic_routes()
138
- if debug:
139
- print(f"Registering routes: {routes}")
140
  for path in routes:
141
  self._register_route(path)
142
-
143
- def _get_dynamic_routes(self) -> List[str]:
144
- """Get dynamic routes based on environment variables"""
 
 
 
 
145
  default_path = "/v1/chat/completions"
146
  replace_chat = os.getenv("REPLACE_CHAT", "")
147
  prefix_chat = os.getenv("PREFIX_CHAT", "")
@@ -152,57 +47,120 @@ class RouteManager:
152
 
153
  routes = []
154
  if prefix_chat:
155
- routes.extend(f"{prefix.rstrip('/')}{default_path}"
156
- for prefix in prefix_chat.split(","))
157
  return routes
158
 
159
  if append_chat:
160
  append_paths = [path.strip() for path in append_chat.split(",") if path.strip()]
161
  routes = [default_path] + append_paths
 
162
 
163
- return routes or [default_path]
164
-
165
- def _register_route(self, path: str):
166
- """Register a single route"""
167
 
168
- @self.app.post(path)
 
169
  async def chat_endpoint(request: Request) -> Dict[str, Any]:
170
  try:
171
  headers = dict(request.headers)
172
  data = await request.json()
173
  if debug:
174
  print(f"Request received...\r\n\tHeaders: {headers},\r\n\tData: {data}")
175
- return await self.simulator.generate_response(headers, data)
176
  except JSONDecodeError as e:
177
  if debug:
178
  print(f"JSON decode error: {e}")
179
- raise HTTPException(status_code=400, detail="Invalid JSON format")
180
  except Exception as e:
181
  if debug:
182
  print(f"Request processing error: {e}")
183
- raise HTTPException(status_code=500, detail="Internal server error")
 
 
184
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
 
186
- @app.get("/")
187
- async def health_check() -> str:
188
- """Health check endpoint"""
189
- return "Service is running..."
 
 
 
 
 
 
190
 
 
 
 
 
 
191
 
192
- def run_server(host: str = "0.0.0.0", port: int = 7860):
193
- """Run server with optimized configuration"""
194
- config = ServerConfig.get_uvicorn_config(
195
- app=app,
196
- host=host,
197
- port=port
198
- )
 
 
 
 
 
199
 
200
- server = uvicorn.Server(config)
201
- server.run()
 
 
 
202
 
 
 
 
203
 
204
  if __name__ == "__main__":
205
- route_manager = RouteManager(app)
206
- route_manager.setup_routes()
207
  port = int(os.getenv("PORT", "7860"))
208
- run_server(port=port)
 
 
1
  import os
 
2
  import time
3
  import multiprocessing
4
+ from typing import Dict, Any, List
5
  from fastapi import FastAPI, Request, HTTPException
6
  import uvicorn
7
  import tiktoken
8
  from json.decoder import JSONDecodeError
9
+ import random
10
+ import string
11
 
 
12
  app = FastAPI(
13
+ title="ones",
14
+ description="High-performance API service",
15
+ version="1.0.0|2025.1.6"
16
  )
17
 
18
+ debug = False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
+ class APIServer:
21
+ """High-performance API server implementation"""
22
+
23
  def __init__(self, app: FastAPI):
24
  self.app = app
25
+ self.encoding = tiktoken.get_encoding("cl100k_base")
26
+ self._setup_routes()
27
+
28
+ def _setup_routes(self) -> None:
29
+ """Initialize API routes"""
30
+ routes = self._get_routes()
 
31
  for path in routes:
32
  self._register_route(path)
33
+
34
+ @self.app.get("/")
35
+ async def health_check() -> str:
36
+ return "你好"
37
+
38
+ def _get_routes(self) -> List[str]:
39
+ """Get configured API routes"""
40
  default_path = "/v1/chat/completions"
41
  replace_chat = os.getenv("REPLACE_CHAT", "")
42
  prefix_chat = os.getenv("PREFIX_CHAT", "")
 
47
 
48
  routes = []
49
  if prefix_chat:
50
+ routes.extend(f"{prefix.rstrip('/')}{default_path}"
51
+ for prefix in prefix_chat.split(","))
52
  return routes
53
 
54
  if append_chat:
55
  append_paths = [path.strip() for path in append_chat.split(",") if path.strip()]
56
  routes = [default_path] + append_paths
57
+ return routes
58
 
59
+ return [default_path]
 
 
 
60
 
61
+ def _register_route(self, path: str) -> None:
62
+ """Register a single API route"""
63
  async def chat_endpoint(request: Request) -> Dict[str, Any]:
64
  try:
65
  headers = dict(request.headers)
66
  data = await request.json()
67
  if debug:
68
  print(f"Request received...\r\n\tHeaders: {headers},\r\n\tData: {data}")
69
+ return await self._generate_response(headers, data)
70
  except JSONDecodeError as e:
71
  if debug:
72
  print(f"JSON decode error: {e}")
73
+ raise HTTPException(status_code=400, detail="Invalid JSON format") from e
74
  except Exception as e:
75
  if debug:
76
  print(f"Request processing error: {e}")
77
+ raise HTTPException(status_code=500, detail="Internal server error") from e
78
+
79
+ self.app.post(path)(chat_endpoint)
80
 
81
+ def _calculate_tokens(self, text: str) -> int:
82
+ """Calculate token count for text"""
83
+ return len(self.encoding.encode(text))
84
+
85
+ def _generate_id(self, letters: int = 4, numbers: int = 6) -> str:
86
+ """Generate unique chat completion ID"""
87
+ letters_str = ''.join(random.choices(string.ascii_lowercase, k=letters))
88
+ numbers_str = ''.join(random.choices(string.digits, k=numbers))
89
+ return f"chatcmpl-{letters_str}{numbers_str}"
90
+
91
+ async def _generate_response(self, headers: Dict[str, str], data: Dict[str, Any]) -> Dict[str, Any]:
92
+ """Generate API response"""
93
+ try:
94
+ result = "This is a test result."
95
+ prompt_tokens = self._calculate_tokens(str(data))
96
+ completion_tokens = self._calculate_tokens(result)
97
+ total_tokens = prompt_tokens + completion_tokens
98
+
99
+ return {
100
+ "id": self._generate_id(),
101
+ "object": "chat.completion",
102
+ "created": int(time.time()),
103
+ "model": data.get("model", "gpt-3.5-turbo"),
104
+ "usage": {
105
+ "prompt_tokens": prompt_tokens,
106
+ "completion_tokens": completion_tokens,
107
+ "total_tokens": total_tokens
108
+ },
109
+ "choices": [{
110
+ "message": {
111
+ "role": "assistant",
112
+ "content": result
113
+ },
114
+ "finish_reason": "stop",
115
+ "index": 0
116
+ }]
117
+ }
118
+ except Exception as e:
119
+ if debug:
120
+ print(f"Response generation error: {e}")
121
+ raise HTTPException(status_code=500, detail=str(e)) from e
122
 
123
+ def _get_workers_count(self) -> int:
124
+ """Calculate optimal worker count"""
125
+ try:
126
+ cpu_cores = multiprocessing.cpu_count()
127
+ recommended_workers = (2 * cpu_cores) + 1
128
+ return min(max(4, recommended_workers), 8)
129
+ except Exception as e:
130
+ if debug:
131
+ print(f"Worker count calculation failed: {e}, using default 4")
132
+ return 4
133
 
134
+ def get_server_config(self, host: str = "0.0.0.0", port: int = 7860) -> uvicorn.Config:
135
+ """Get server configuration"""
136
+ workers = self._get_workers_count()
137
+ if debug:
138
+ print(f"Configuring server with {workers} workers")
139
 
140
+ return uvicorn.Config(
141
+ app=self.app,
142
+ host=host,
143
+ port=port,
144
+ workers=workers,
145
+ loop="uvloop",
146
+ limit_concurrency=1000,
147
+ timeout_keep_alive=30,
148
+ access_log=True,
149
+ log_level="info",
150
+ http="httptools"
151
+ )
152
 
153
+ def run(self, host: str = "0.0.0.0", port: int = 7860) -> None:
154
+ """Run the API server"""
155
+ config = self.get_server_config(host, port)
156
+ server = uvicorn.Server(config)
157
+ server.run()
158
 
159
+ def create_server() -> APIServer:
160
+ """Factory function to create server instance"""
161
+ return APIServer(app)
162
 
163
  if __name__ == "__main__":
 
 
164
  port = int(os.getenv("PORT", "7860"))
165
+ server = create_server()
166
+ server.run(port=port)
more_core_bksss.py ADDED
@@ -0,0 +1,208 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import asyncio
3
+ import time
4
+ import multiprocessing
5
+ from typing import Dict, Any, List, Optional
6
+ from fastapi import FastAPI, Request, HTTPException
7
+ import uvicorn
8
+ import tiktoken
9
+ from json.decoder import JSONDecodeError
10
+
11
+ # Initialize FastAPI application
12
+ app = FastAPI(
13
+ title="ones",
14
+ description="High-performance API service",
15
+ version="1.0.0|2025.1.6"
16
+ )
17
+ # debug
18
+ debug = False
19
+
20
+
21
+ class ServerConfig:
22
+ """Server configuration handler with dynamic worker calculation"""
23
+
24
+ @staticmethod
25
+ def get_workers_count() -> int:
26
+ """
27
+ Calculate optimal number of workers
28
+ Default: 4, Maximum: 8
29
+ Formula: min(max(4, (2 * CPU cores) + 1), 8)
30
+ """
31
+ try:
32
+ cpu_cores = multiprocessing.cpu_count()
33
+ recommended_workers = (2 * cpu_cores) + 1
34
+ return min(max(4, recommended_workers), 8)
35
+ except Exception as e:
36
+ if debug:
37
+ print(f"Worker count calculation failed: {e}, using default 4")
38
+ return 4
39
+
40
+ @classmethod
41
+ def get_uvicorn_config(cls,
42
+ app: FastAPI,
43
+ host: str = "0.0.0.0",
44
+ port: int = 7860) -> uvicorn.Config:
45
+ """Get optimized Uvicorn configuration"""
46
+ workers = cls.get_workers_count()
47
+ if debug:
48
+ print(f"Configuring server with {workers} workers")
49
+
50
+ return uvicorn.Config(
51
+ app=app,
52
+ host=host,
53
+ port=port,
54
+ workers=workers,
55
+ loop="uvloop",
56
+ limit_concurrency=1000,
57
+ timeout_keep_alive=30,
58
+ access_log=True,
59
+ log_level="info",
60
+ http="httptools"
61
+ )
62
+
63
+
64
+ class TokenProcessor:
65
+ """Token processing handler"""
66
+
67
+ def __init__(self):
68
+ self.encoding = tiktoken.get_encoding("cl100k_base")
69
+
70
+ async def calculate_tokens(self, text: str) -> int:
71
+ """Calculate tokens for given text"""
72
+ return len(self.encoding.encode(text))
73
+
74
+
75
+ class ChatGPTSimulator:
76
+ """ChatGPT response simulator"""
77
+
78
+ def __init__(self):
79
+ self.token_processor = TokenProcessor()
80
+
81
+ @staticmethod
82
+ def generate_id(letters: int = 4, numbers: int = 6) -> str:
83
+ """Generate random chat completion ID"""
84
+ import random
85
+ import string
86
+ letters_str = ''.join(random.choices(string.ascii_lowercase, k=letters))
87
+ numbers_str = ''.join(random.choices(string.digits, k=numbers))
88
+ return f"chatcmpl-{letters_str}{numbers_str}"
89
+
90
+ async def generate_response(self, headers: Dict[str, str], data: Dict[str, Any]) -> Dict[str, Any]:
91
+ """Generate simulated ChatGPT response"""
92
+ try:
93
+ result = "This is a test result."
94
+
95
+ prompt_tokens = await self.token_processor.calculate_tokens(str(data))
96
+ completion_tokens = await self.token_processor.calculate_tokens(result)
97
+ total_tokens = prompt_tokens + completion_tokens
98
+
99
+ response_data = {
100
+ "id": self.generate_id(),
101
+ "object": "chat.completion",
102
+ "created": int(time.time()),
103
+ "model": data.get("model", "gpt-3.5-turbo"),
104
+ "usage": {
105
+ "prompt_tokens": prompt_tokens,
106
+ "completion_tokens": completion_tokens,
107
+ "total_tokens": total_tokens
108
+ },
109
+ "choices": [
110
+ {
111
+ "message": {
112
+ "role": "assistant",
113
+ "content": result
114
+ },
115
+ "finish_reason": "stop",
116
+ "index": 0
117
+ }
118
+ ]
119
+ }
120
+ return response_data
121
+
122
+ except Exception as e:
123
+ if debug:
124
+ print(f"Response generation error: {e}")
125
+ raise HTTPException(status_code=500, detail=str(e))
126
+
127
+
128
+ class RouteManager:
129
+ """Route management handler"""
130
+
131
+ def __init__(self, app: FastAPI):
132
+ self.app = app
133
+ self.simulator = ChatGPTSimulator()
134
+
135
+ def setup_routes(self):
136
+ """Setup API routes"""
137
+ routes = self._get_dynamic_routes()
138
+ if debug:
139
+ print(f"Registering routes: {routes}")
140
+ for path in routes:
141
+ self._register_route(path)
142
+
143
+ def _get_dynamic_routes(self) -> List[str]:
144
+ """Get dynamic routes based on environment variables"""
145
+ default_path = "/v1/chat/completions"
146
+ replace_chat = os.getenv("REPLACE_CHAT", "")
147
+ prefix_chat = os.getenv("PREFIX_CHAT", "")
148
+ append_chat = os.getenv("APPEND_CHAT", "")
149
+
150
+ if replace_chat:
151
+ return [path.strip() for path in replace_chat.split(",") if path.strip()]
152
+
153
+ routes = []
154
+ if prefix_chat:
155
+ routes.extend(f"{prefix.rstrip('/')}{default_path}"
156
+ for prefix in prefix_chat.split(","))
157
+ return routes
158
+
159
+ if append_chat:
160
+ append_paths = [path.strip() for path in append_chat.split(",") if path.strip()]
161
+ routes = [default_path] + append_paths
162
+
163
+ return routes or [default_path]
164
+
165
+ def _register_route(self, path: str):
166
+ """Register a single route"""
167
+
168
+ @self.app.post(path)
169
+ async def chat_endpoint(request: Request) -> Dict[str, Any]:
170
+ try:
171
+ headers = dict(request.headers)
172
+ data = await request.json()
173
+ if debug:
174
+ print(f"Request received...\r\n\tHeaders: {headers},\r\n\tData: {data}")
175
+ return await self.simulator.generate_response(headers, data)
176
+ except JSONDecodeError as e:
177
+ if debug:
178
+ print(f"JSON decode error: {e}")
179
+ raise HTTPException(status_code=400, detail="Invalid JSON format")
180
+ except Exception as e:
181
+ if debug:
182
+ print(f"Request processing error: {e}")
183
+ raise HTTPException(status_code=500, detail="Internal server error")
184
+
185
+
186
+ @app.get("/")
187
+ async def health_check() -> str:
188
+ """Health check endpoint"""
189
+ return "Service is running..."
190
+
191
+
192
+ def run_server(host: str = "0.0.0.0", port: int = 7860):
193
+ """Run server with optimized configuration"""
194
+ config = ServerConfig.get_uvicorn_config(
195
+ app=app,
196
+ host=host,
197
+ port=port
198
+ )
199
+
200
+ server = uvicorn.Server(config)
201
+ server.run()
202
+
203
+
204
+ if __name__ == "__main__":
205
+ route_manager = RouteManager(app)
206
+ route_manager.setup_routes()
207
+ port = int(os.getenv("PORT", "7860"))
208
+ run_server(port=port)